diff options
-rw-r--r-- | .envrc | 2 | ||||
-rw-r--r-- | COPYING | 661 | ||||
-rw-r--r-- | Makefile.am | 1 | ||||
-rw-r--r-- | guile.am | 2 | ||||
-rw-r--r-- | guix-dev.scm | 2 | ||||
-rw-r--r-- | nar-herder/cached-compression.scm | 753 | ||||
-rw-r--r-- | nar-herder/database.scm | 1006 | ||||
-rw-r--r-- | nar-herder/mirror.scm | 96 | ||||
-rw-r--r-- | nar-herder/recent-changes.scm | 156 | ||||
-rw-r--r-- | nar-herder/server.scm | 784 | ||||
-rw-r--r-- | nar-herder/storage.scm | 719 | ||||
-rw-r--r-- | nar-herder/utils.scm | 837 | ||||
-rw-r--r-- | scripts/nar-herder.in | 476 |
13 files changed, 4498 insertions, 997 deletions
@@ -2,7 +2,7 @@ export GUILE_LOAD_PATH="" export GUILE_LOAD_COMPILED_PATH="" -use guix --no-grafts -l guix-dev.scm +use guix --no-grafts -D -f guix-dev.scm export GUILE_LOAD_COMPILED_PATH="$PWD:$PWD/tests:$GUILE_LOAD_COMPILED_PATH" export GUILE_LOAD_PATH="$PWD:$GUILE_LOAD_PATH" @@ -0,0 +1,661 @@ + GNU AFFERO GENERAL PUBLIC LICENSE + Version 3, 19 November 2007 + + Copyright (C) 2007 Free Software Foundation, Inc. <http://fsf.org/> + Everyone is permitted to copy and distribute verbatim copies + of this license document, but changing it is not allowed. + + Preamble + + The GNU Affero General Public License is a free, copyleft license for +software and other kinds of works, specifically designed to ensure +cooperation with the community in the case of network server software. + + The licenses for most software and other practical works are designed +to take away your freedom to share and change the works. By contrast, +our General Public Licenses are intended to guarantee your freedom to +share and change all versions of a program--to make sure it remains free +software for all its users. + + When we speak of free software, we are referring to freedom, not +price. Our General Public Licenses are designed to make sure that you +have the freedom to distribute copies of free software (and charge for +them if you wish), that you receive source code or can get it if you +want it, that you can change the software or use pieces of it in new +free programs, and that you know you can do these things. + + Developers that use our General Public Licenses protect your rights +with two steps: (1) assert copyright on the software, and (2) offer +you this License which gives you legal permission to copy, distribute +and/or modify the software. + + A secondary benefit of defending all users' freedom is that +improvements made in alternate versions of the program, if they +receive widespread use, become available for other developers to +incorporate. Many developers of free software are heartened and +encouraged by the resulting cooperation. However, in the case of +software used on network servers, this result may fail to come about. +The GNU General Public License permits making a modified version and +letting the public access it on a server without ever releasing its +source code to the public. + + The GNU Affero General Public License is designed specifically to +ensure that, in such cases, the modified source code becomes available +to the community. It requires the operator of a network server to +provide the source code of the modified version running there to the +users of that server. Therefore, public use of a modified version, on +a publicly accessible server, gives the public access to the source +code of the modified version. + + An older license, called the Affero General Public License and +published by Affero, was designed to accomplish similar goals. This is +a different license, not a version of the Affero GPL, but Affero has +released a new version of the Affero GPL which permits relicensing under +this license. + + The precise terms and conditions for copying, distribution and +modification follow. + + TERMS AND CONDITIONS + + 0. Definitions. + + "This License" refers to version 3 of the GNU Affero General Public License. + + "Copyright" also means copyright-like laws that apply to other kinds of +works, such as semiconductor masks. + + "The Program" refers to any copyrightable work licensed under this +License. Each licensee is addressed as "you". "Licensees" and +"recipients" may be individuals or organizations. + + To "modify" a work means to copy from or adapt all or part of the work +in a fashion requiring copyright permission, other than the making of an +exact copy. The resulting work is called a "modified version" of the +earlier work or a work "based on" the earlier work. + + A "covered work" means either the unmodified Program or a work based +on the Program. + + To "propagate" a work means to do anything with it that, without +permission, would make you directly or secondarily liable for +infringement under applicable copyright law, except executing it on a +computer or modifying a private copy. Propagation includes copying, +distribution (with or without modification), making available to the +public, and in some countries other activities as well. + + To "convey" a work means any kind of propagation that enables other +parties to make or receive copies. Mere interaction with a user through +a computer network, with no transfer of a copy, is not conveying. + + An interactive user interface displays "Appropriate Legal Notices" +to the extent that it includes a convenient and prominently visible +feature that (1) displays an appropriate copyright notice, and (2) +tells the user that there is no warranty for the work (except to the +extent that warranties are provided), that licensees may convey the +work under this License, and how to view a copy of this License. If +the interface presents a list of user commands or options, such as a +menu, a prominent item in the list meets this criterion. + + 1. Source Code. + + The "source code" for a work means the preferred form of the work +for making modifications to it. "Object code" means any non-source +form of a work. + + A "Standard Interface" means an interface that either is an official +standard defined by a recognized standards body, or, in the case of +interfaces specified for a particular programming language, one that +is widely used among developers working in that language. + + The "System Libraries" of an executable work include anything, other +than the work as a whole, that (a) is included in the normal form of +packaging a Major Component, but which is not part of that Major +Component, and (b) serves only to enable use of the work with that +Major Component, or to implement a Standard Interface for which an +implementation is available to the public in source code form. A +"Major Component", in this context, means a major essential component +(kernel, window system, and so on) of the specific operating system +(if any) on which the executable work runs, or a compiler used to +produce the work, or an object code interpreter used to run it. + + The "Corresponding Source" for a work in object code form means all +the source code needed to generate, install, and (for an executable +work) run the object code and to modify the work, including scripts to +control those activities. However, it does not include the work's +System Libraries, or general-purpose tools or generally available free +programs which are used unmodified in performing those activities but +which are not part of the work. For example, Corresponding Source +includes interface definition files associated with source files for +the work, and the source code for shared libraries and dynamically +linked subprograms that the work is specifically designed to require, +such as by intimate data communication or control flow between those +subprograms and other parts of the work. + + The Corresponding Source need not include anything that users +can regenerate automatically from other parts of the Corresponding +Source. + + The Corresponding Source for a work in source code form is that +same work. + + 2. Basic Permissions. + + All rights granted under this License are granted for the term of +copyright on the Program, and are irrevocable provided the stated +conditions are met. This License explicitly affirms your unlimited +permission to run the unmodified Program. The output from running a +covered work is covered by this License only if the output, given its +content, constitutes a covered work. This License acknowledges your +rights of fair use or other equivalent, as provided by copyright law. + + You may make, run and propagate covered works that you do not +convey, without conditions so long as your license otherwise remains +in force. You may convey covered works to others for the sole purpose +of having them make modifications exclusively for you, or provide you +with facilities for running those works, provided that you comply with +the terms of this License in conveying all material for which you do +not control copyright. Those thus making or running the covered works +for you must do so exclusively on your behalf, under your direction +and control, on terms that prohibit them from making any copies of +your copyrighted material outside their relationship with you. + + Conveying under any other circumstances is permitted solely under +the conditions stated below. Sublicensing is not allowed; section 10 +makes it unnecessary. + + 3. Protecting Users' Legal Rights From Anti-Circumvention Law. + + No covered work shall be deemed part of an effective technological +measure under any applicable law fulfilling obligations under article +11 of the WIPO copyright treaty adopted on 20 December 1996, or +similar laws prohibiting or restricting circumvention of such +measures. + + When you convey a covered work, you waive any legal power to forbid +circumvention of technological measures to the extent such circumvention +is effected by exercising rights under this License with respect to +the covered work, and you disclaim any intention to limit operation or +modification of the work as a means of enforcing, against the work's +users, your or third parties' legal rights to forbid circumvention of +technological measures. + + 4. Conveying Verbatim Copies. + + You may convey verbatim copies of the Program's source code as you +receive it, in any medium, provided that you conspicuously and +appropriately publish on each copy an appropriate copyright notice; +keep intact all notices stating that this License and any +non-permissive terms added in accord with section 7 apply to the code; +keep intact all notices of the absence of any warranty; and give all +recipients a copy of this License along with the Program. + + You may charge any price or no price for each copy that you convey, +and you may offer support or warranty protection for a fee. + + 5. Conveying Modified Source Versions. + + You may convey a work based on the Program, or the modifications to +produce it from the Program, in the form of source code under the +terms of section 4, provided that you also meet all of these conditions: + + a) The work must carry prominent notices stating that you modified + it, and giving a relevant date. + + b) The work must carry prominent notices stating that it is + released under this License and any conditions added under section + 7. This requirement modifies the requirement in section 4 to + "keep intact all notices". + + c) You must license the entire work, as a whole, under this + License to anyone who comes into possession of a copy. This + License will therefore apply, along with any applicable section 7 + additional terms, to the whole of the work, and all its parts, + regardless of how they are packaged. This License gives no + permission to license the work in any other way, but it does not + invalidate such permission if you have separately received it. + + d) If the work has interactive user interfaces, each must display + Appropriate Legal Notices; however, if the Program has interactive + interfaces that do not display Appropriate Legal Notices, your + work need not make them do so. + + A compilation of a covered work with other separate and independent +works, which are not by their nature extensions of the covered work, +and which are not combined with it such as to form a larger program, +in or on a volume of a storage or distribution medium, is called an +"aggregate" if the compilation and its resulting copyright are not +used to limit the access or legal rights of the compilation's users +beyond what the individual works permit. Inclusion of a covered work +in an aggregate does not cause this License to apply to the other +parts of the aggregate. + + 6. Conveying Non-Source Forms. + + You may convey a covered work in object code form under the terms +of sections 4 and 5, provided that you also convey the +machine-readable Corresponding Source under the terms of this License, +in one of these ways: + + a) Convey the object code in, or embodied in, a physical product + (including a physical distribution medium), accompanied by the + Corresponding Source fixed on a durable physical medium + customarily used for software interchange. + + b) Convey the object code in, or embodied in, a physical product + (including a physical distribution medium), accompanied by a + written offer, valid for at least three years and valid for as + long as you offer spare parts or customer support for that product + model, to give anyone who possesses the object code either (1) a + copy of the Corresponding Source for all the software in the + product that is covered by this License, on a durable physical + medium customarily used for software interchange, for a price no + more than your reasonable cost of physically performing this + conveying of source, or (2) access to copy the + Corresponding Source from a network server at no charge. + + c) Convey individual copies of the object code with a copy of the + written offer to provide the Corresponding Source. This + alternative is allowed only occasionally and noncommercially, and + only if you received the object code with such an offer, in accord + with subsection 6b. + + d) Convey the object code by offering access from a designated + place (gratis or for a charge), and offer equivalent access to the + Corresponding Source in the same way through the same place at no + further charge. You need not require recipients to copy the + Corresponding Source along with the object code. If the place to + copy the object code is a network server, the Corresponding Source + may be on a different server (operated by you or a third party) + that supports equivalent copying facilities, provided you maintain + clear directions next to the object code saying where to find the + Corresponding Source. Regardless of what server hosts the + Corresponding Source, you remain obligated to ensure that it is + available for as long as needed to satisfy these requirements. + + e) Convey the object code using peer-to-peer transmission, provided + you inform other peers where the object code and Corresponding + Source of the work are being offered to the general public at no + charge under subsection 6d. + + A separable portion of the object code, whose source code is excluded +from the Corresponding Source as a System Library, need not be +included in conveying the object code work. + + A "User Product" is either (1) a "consumer product", which means any +tangible personal property which is normally used for personal, family, +or household purposes, or (2) anything designed or sold for incorporation +into a dwelling. In determining whether a product is a consumer product, +doubtful cases shall be resolved in favor of coverage. For a particular +product received by a particular user, "normally used" refers to a +typical or common use of that class of product, regardless of the status +of the particular user or of the way in which the particular user +actually uses, or expects or is expected to use, the product. A product +is a consumer product regardless of whether the product has substantial +commercial, industrial or non-consumer uses, unless such uses represent +the only significant mode of use of the product. + + "Installation Information" for a User Product means any methods, +procedures, authorization keys, or other information required to install +and execute modified versions of a covered work in that User Product from +a modified version of its Corresponding Source. The information must +suffice to ensure that the continued functioning of the modified object +code is in no case prevented or interfered with solely because +modification has been made. + + If you convey an object code work under this section in, or with, or +specifically for use in, a User Product, and the conveying occurs as +part of a transaction in which the right of possession and use of the +User Product is transferred to the recipient in perpetuity or for a +fixed term (regardless of how the transaction is characterized), the +Corresponding Source conveyed under this section must be accompanied +by the Installation Information. But this requirement does not apply +if neither you nor any third party retains the ability to install +modified object code on the User Product (for example, the work has +been installed in ROM). + + The requirement to provide Installation Information does not include a +requirement to continue to provide support service, warranty, or updates +for a work that has been modified or installed by the recipient, or for +the User Product in which it has been modified or installed. Access to a +network may be denied when the modification itself materially and +adversely affects the operation of the network or violates the rules and +protocols for communication across the network. + + Corresponding Source conveyed, and Installation Information provided, +in accord with this section must be in a format that is publicly +documented (and with an implementation available to the public in +source code form), and must require no special password or key for +unpacking, reading or copying. + + 7. Additional Terms. + + "Additional permissions" are terms that supplement the terms of this +License by making exceptions from one or more of its conditions. +Additional permissions that are applicable to the entire Program shall +be treated as though they were included in this License, to the extent +that they are valid under applicable law. If additional permissions +apply only to part of the Program, that part may be used separately +under those permissions, but the entire Program remains governed by +this License without regard to the additional permissions. + + When you convey a copy of a covered work, you may at your option +remove any additional permissions from that copy, or from any part of +it. (Additional permissions may be written to require their own +removal in certain cases when you modify the work.) You may place +additional permissions on material, added by you to a covered work, +for which you have or can give appropriate copyright permission. + + Notwithstanding any other provision of this License, for material you +add to a covered work, you may (if authorized by the copyright holders of +that material) supplement the terms of this License with terms: + + a) Disclaiming warranty or limiting liability differently from the + terms of sections 15 and 16 of this License; or + + b) Requiring preservation of specified reasonable legal notices or + author attributions in that material or in the Appropriate Legal + Notices displayed by works containing it; or + + c) Prohibiting misrepresentation of the origin of that material, or + requiring that modified versions of such material be marked in + reasonable ways as different from the original version; or + + d) Limiting the use for publicity purposes of names of licensors or + authors of the material; or + + e) Declining to grant rights under trademark law for use of some + trade names, trademarks, or service marks; or + + f) Requiring indemnification of licensors and authors of that + material by anyone who conveys the material (or modified versions of + it) with contractual assumptions of liability to the recipient, for + any liability that these contractual assumptions directly impose on + those licensors and authors. + + All other non-permissive additional terms are considered "further +restrictions" within the meaning of section 10. If the Program as you +received it, or any part of it, contains a notice stating that it is +governed by this License along with a term that is a further +restriction, you may remove that term. If a license document contains +a further restriction but permits relicensing or conveying under this +License, you may add to a covered work material governed by the terms +of that license document, provided that the further restriction does +not survive such relicensing or conveying. + + If you add terms to a covered work in accord with this section, you +must place, in the relevant source files, a statement of the +additional terms that apply to those files, or a notice indicating +where to find the applicable terms. + + Additional terms, permissive or non-permissive, may be stated in the +form of a separately written license, or stated as exceptions; +the above requirements apply either way. + + 8. Termination. + + You may not propagate or modify a covered work except as expressly +provided under this License. Any attempt otherwise to propagate or +modify it is void, and will automatically terminate your rights under +this License (including any patent licenses granted under the third +paragraph of section 11). + + However, if you cease all violation of this License, then your +license from a particular copyright holder is reinstated (a) +provisionally, unless and until the copyright holder explicitly and +finally terminates your license, and (b) permanently, if the copyright +holder fails to notify you of the violation by some reasonable means +prior to 60 days after the cessation. + + Moreover, your license from a particular copyright holder is +reinstated permanently if the copyright holder notifies you of the +violation by some reasonable means, this is the first time you have +received notice of violation of this License (for any work) from that +copyright holder, and you cure the violation prior to 30 days after +your receipt of the notice. + + Termination of your rights under this section does not terminate the +licenses of parties who have received copies or rights from you under +this License. If your rights have been terminated and not permanently +reinstated, you do not qualify to receive new licenses for the same +material under section 10. + + 9. Acceptance Not Required for Having Copies. + + You are not required to accept this License in order to receive or +run a copy of the Program. Ancillary propagation of a covered work +occurring solely as a consequence of using peer-to-peer transmission +to receive a copy likewise does not require acceptance. However, +nothing other than this License grants you permission to propagate or +modify any covered work. These actions infringe copyright if you do +not accept this License. Therefore, by modifying or propagating a +covered work, you indicate your acceptance of this License to do so. + + 10. Automatic Licensing of Downstream Recipients. + + Each time you convey a covered work, the recipient automatically +receives a license from the original licensors, to run, modify and +propagate that work, subject to this License. You are not responsible +for enforcing compliance by third parties with this License. + + An "entity transaction" is a transaction transferring control of an +organization, or substantially all assets of one, or subdividing an +organization, or merging organizations. If propagation of a covered +work results from an entity transaction, each party to that +transaction who receives a copy of the work also receives whatever +licenses to the work the party's predecessor in interest had or could +give under the previous paragraph, plus a right to possession of the +Corresponding Source of the work from the predecessor in interest, if +the predecessor has it or can get it with reasonable efforts. + + You may not impose any further restrictions on the exercise of the +rights granted or affirmed under this License. For example, you may +not impose a license fee, royalty, or other charge for exercise of +rights granted under this License, and you may not initiate litigation +(including a cross-claim or counterclaim in a lawsuit) alleging that +any patent claim is infringed by making, using, selling, offering for +sale, or importing the Program or any portion of it. + + 11. Patents. + + A "contributor" is a copyright holder who authorizes use under this +License of the Program or a work on which the Program is based. The +work thus licensed is called the contributor's "contributor version". + + A contributor's "essential patent claims" are all patent claims +owned or controlled by the contributor, whether already acquired or +hereafter acquired, that would be infringed by some manner, permitted +by this License, of making, using, or selling its contributor version, +but do not include claims that would be infringed only as a +consequence of further modification of the contributor version. For +purposes of this definition, "control" includes the right to grant +patent sublicenses in a manner consistent with the requirements of +this License. + + Each contributor grants you a non-exclusive, worldwide, royalty-free +patent license under the contributor's essential patent claims, to +make, use, sell, offer for sale, import and otherwise run, modify and +propagate the contents of its contributor version. + + In the following three paragraphs, a "patent license" is any express +agreement or commitment, however denominated, not to enforce a patent +(such as an express permission to practice a patent or covenant not to +sue for patent infringement). To "grant" such a patent license to a +party means to make such an agreement or commitment not to enforce a +patent against the party. + + If you convey a covered work, knowingly relying on a patent license, +and the Corresponding Source of the work is not available for anyone +to copy, free of charge and under the terms of this License, through a +publicly available network server or other readily accessible means, +then you must either (1) cause the Corresponding Source to be so +available, or (2) arrange to deprive yourself of the benefit of the +patent license for this particular work, or (3) arrange, in a manner +consistent with the requirements of this License, to extend the patent +license to downstream recipients. "Knowingly relying" means you have +actual knowledge that, but for the patent license, your conveying the +covered work in a country, or your recipient's use of the covered work +in a country, would infringe one or more identifiable patents in that +country that you have reason to believe are valid. + + If, pursuant to or in connection with a single transaction or +arrangement, you convey, or propagate by procuring conveyance of, a +covered work, and grant a patent license to some of the parties +receiving the covered work authorizing them to use, propagate, modify +or convey a specific copy of the covered work, then the patent license +you grant is automatically extended to all recipients of the covered +work and works based on it. + + A patent license is "discriminatory" if it does not include within +the scope of its coverage, prohibits the exercise of, or is +conditioned on the non-exercise of one or more of the rights that are +specifically granted under this License. You may not convey a covered +work if you are a party to an arrangement with a third party that is +in the business of distributing software, under which you make payment +to the third party based on the extent of your activity of conveying +the work, and under which the third party grants, to any of the +parties who would receive the covered work from you, a discriminatory +patent license (a) in connection with copies of the covered work +conveyed by you (or copies made from those copies), or (b) primarily +for and in connection with specific products or compilations that +contain the covered work, unless you entered into that arrangement, +or that patent license was granted, prior to 28 March 2007. + + Nothing in this License shall be construed as excluding or limiting +any implied license or other defenses to infringement that may +otherwise be available to you under applicable patent law. + + 12. No Surrender of Others' Freedom. + + If conditions are imposed on you (whether by court order, agreement or +otherwise) that contradict the conditions of this License, they do not +excuse you from the conditions of this License. If you cannot convey a +covered work so as to satisfy simultaneously your obligations under this +License and any other pertinent obligations, then as a consequence you may +not convey it at all. For example, if you agree to terms that obligate you +to collect a royalty for further conveying from those to whom you convey +the Program, the only way you could satisfy both those terms and this +License would be to refrain entirely from conveying the Program. + + 13. Remote Network Interaction; Use with the GNU General Public License. + + Notwithstanding any other provision of this License, if you modify the +Program, your modified version must prominently offer all users +interacting with it remotely through a computer network (if your version +supports such interaction) an opportunity to receive the Corresponding +Source of your version by providing access to the Corresponding Source +from a network server at no charge, through some standard or customary +means of facilitating copying of software. This Corresponding Source +shall include the Corresponding Source for any work covered by version 3 +of the GNU General Public License that is incorporated pursuant to the +following paragraph. + + Notwithstanding any other provision of this License, you have +permission to link or combine any covered work with a work licensed +under version 3 of the GNU General Public License into a single +combined work, and to convey the resulting work. The terms of this +License will continue to apply to the part which is the covered work, +but the work with which it is combined will remain governed by version +3 of the GNU General Public License. + + 14. Revised Versions of this License. + + The Free Software Foundation may publish revised and/or new versions of +the GNU Affero General Public License from time to time. Such new versions +will be similar in spirit to the present version, but may differ in detail to +address new problems or concerns. + + Each version is given a distinguishing version number. If the +Program specifies that a certain numbered version of the GNU Affero General +Public License "or any later version" applies to it, you have the +option of following the terms and conditions either of that numbered +version or of any later version published by the Free Software +Foundation. If the Program does not specify a version number of the +GNU Affero General Public License, you may choose any version ever published +by the Free Software Foundation. + + If the Program specifies that a proxy can decide which future +versions of the GNU Affero General Public License can be used, that proxy's +public statement of acceptance of a version permanently authorizes you +to choose that version for the Program. + + Later license versions may give you additional or different +permissions. However, no additional obligations are imposed on any +author or copyright holder as a result of your choosing to follow a +later version. + + 15. Disclaimer of Warranty. + + THERE IS NO WARRANTY FOR THE PROGRAM, TO THE EXTENT PERMITTED BY +APPLICABLE LAW. EXCEPT WHEN OTHERWISE STATED IN WRITING THE COPYRIGHT +HOLDERS AND/OR OTHER PARTIES PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY +OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, +THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +PURPOSE. THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE PROGRAM +IS WITH YOU. SHOULD THE PROGRAM PROVE DEFECTIVE, YOU ASSUME THE COST OF +ALL NECESSARY SERVICING, REPAIR OR CORRECTION. + + 16. Limitation of Liability. + + IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING +WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MODIFIES AND/OR CONVEYS +THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR DAMAGES, INCLUDING ANY +GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE +USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT LIMITED TO LOSS OF +DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD +PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER PROGRAMS), +EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF +SUCH DAMAGES. + + 17. Interpretation of Sections 15 and 16. + + If the disclaimer of warranty and limitation of liability provided +above cannot be given local legal effect according to their terms, +reviewing courts shall apply local law that most closely approximates +an absolute waiver of all civil liability in connection with the +Program, unless a warranty or assumption of liability accompanies a +copy of the Program in return for a fee. + + END OF TERMS AND CONDITIONS + + How to Apply These Terms to Your New Programs + + If you develop a new program, and you want it to be of the greatest +possible use to the public, the best way to achieve this is to make it +free software which everyone can redistribute and change under these terms. + + To do so, attach the following notices to the program. It is safest +to attach them to the start of each source file to most effectively +state the exclusion of warranty; and each file should have at least +the "copyright" line and a pointer to where the full notice is found. + + <one line to give the program's name and a brief idea of what it does.> + Copyright (C) <year> <name of author> + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + +Also add information on how to contact you by electronic and paper mail. + + If your software can interact with users remotely through a computer +network, you should also make sure that it provides a way for users to +get its source. For example, if your program is a web application, its +interface could display a "Source" link that leads users to an archive +of the code. There are many ways you could offer source, and different +solutions will be better for different programs; see section 13 for the +specific requirements. + + You should also get your employer (if you work as a programmer) or school, +if any, to sign a "copyright disclaimer" for the program, if necessary. +For more information on this, and how to apply and follow the GNU AGPL, see +<http://www.gnu.org/licenses/>. diff --git a/Makefile.am b/Makefile.am index 7ac77d7..d2617ab 100644 --- a/Makefile.am +++ b/Makefile.am @@ -7,6 +7,7 @@ SOURCES = \ nar-herder/database.scm \ nar-herder/server.scm \ nar-herder/recent-changes.scm \ + nar-herder/cached-compression.scm \ nar-herder/storage.scm \ nar-herder/mirror.scm \ nar-herder/utils.scm @@ -19,4 +19,4 @@ EXTRA_DIST = $(SOURCES) $(NOCOMP_SOURCES) GUILE_WARNINGS = -Wunbound-variable -Warity-mismatch -Wformat SUFFIXES = .scm .go .scm.go: - $(AM_V_GEN)$(top_builddir)/pre-inst-env $(GUILE_TOOLS) compile $(GUILE_TARGET) $(GUILE_WARNINGS) -o "$@" "$<" + $(AM_V_GEN)$(top_builddir)/pre-inst-env $(GUILE_TOOLS) compile -W3 $(GUILE_TARGET) $(GUILE_WARNINGS) -o "$@" "$<" diff --git a/guix-dev.scm b/guix-dev.scm index bd6828f..61e282c 100644 --- a/guix-dev.scm +++ b/guix-dev.scm @@ -53,7 +53,7 @@ (inputs `(("guix" ,guix) ("guile-json" ,guile-json-4) - ("guile-fibers" ,guile-fibers-1.1) + ("guile-fibers" ,guile-fibers-1.3) ("guile-gcrypt" ,guile-gcrypt) ("guile-readline" ,guile-readline) ("guile-lzlib" ,guile-lzlib) diff --git a/nar-herder/cached-compression.scm b/nar-herder/cached-compression.scm new file mode 100644 index 0000000..5c257dc --- /dev/null +++ b/nar-herder/cached-compression.scm @@ -0,0 +1,753 @@ +;;; Nar Herder +;;; +;;; Copyright © 2022, 2023 Christopher Baines <mail@cbaines.net> +;;; +;;; This program is free software: you can redistribute it and/or +;;; modify it under the terms of the GNU Affero General Public License +;;; as published by the Free Software Foundation, either version 3 of +;;; the License, or (at your option) any later version. +;;; +;;; This program is distributed in the hope that it will be useful, +;;; but WITHOUT ANY WARRANTY; without even the implied warranty of +;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +;;; Affero General Public License for more details. +;;; +;;; You should have received a copy of the GNU Affero General Public +;;; License along with this program. If not, see +;;; <http://www.gnu.org/licenses/>. + +(define-module (nar-herder cached-compression) + #:use-module (srfi srfi-1) + #:use-module (srfi srfi-19) + #:use-module (srfi srfi-26) + #:use-module (srfi srfi-71) + #:use-module (ice-9 ftw) + #:use-module (ice-9 match) + #:use-module (ice-9 atomic) + #:use-module (ice-9 threads) + #:use-module (logging logger) + #:use-module (prometheus) + #:use-module (fibers) + #:use-module (fibers timers) + #:use-module (fibers channels) + #:use-module (fibers operations) + #:use-module (web uri) + #:use-module (web client) + #:use-module (web response) + #:use-module (guix store) + #:use-module ((guix utils) #:select (compressed-file? + call-with-decompressed-port)) + #:use-module ((guix build utils) + #:select (dump-port mkdir-p)) + #:use-module (nar-herder utils) + #:use-module (nar-herder database) + #:export (start-cached-compression-management-fiber + start-cached-compression-removal-fiber + start-cached-compression-schedule-removal-fiber)) + +;; Nar caching overview +;; +;; On start +;; - Compute the size of each cached compression directory +;; - Remove database entries if they're missing from the directory +;; - Remove files if they're missing from the database +;; +;; On nar usage +;; - Bump count +;; - If count is sufficient +;; - Trigger generation of the cached nar +;; - Skip if the work queue already includes this job +;; - At the start of the job, check for a database entry and exit +;; early if one exists bumping the atime of the file +;; - If the file doesn't exist, check if there's free space +;; - If there's free space, create the file +;; +;; Nar removal fiber +;; - Periodically remove nars that have been scheduled for removal +;; (when the scheduled time passes) +;; +;; Nar schedule removal fiber +;; - Periodically check for nars that haven't been accessed in some +;; time and schedule them for removal + +(define (perform-cached-compression-startup database + enabled-cached-compressions + nar-cache-files) + (let* ((database-entries-missing-files + ;; List tracking entries in the database for cached files, + ;; where the file is missing from the disk. + ;; + ;; These database entries will be removed at the end of the + ;; startup procecss. + '()) + + ;; alist of compression to hash table of files + ;; + ;; Entries from the hash tables will be removed when the + ;; database entry is processed below, so these hash tables + ;; will be left reflecting files in the directories, but with + ;; no entry in the database. + ;; + ;; These files will be deleted at the end of the startup + ;; process. + (files-by-compression + (map + (match-lambda + ((compression . details) + (let ((result (make-hash-table))) + (for-each + (lambda (file) + (hash-set! result file #t)) + (scandir (assq-ref details 'directory) + (negate (cut member <> '("." ".."))))) + (cons compression + result)))) + enabled-cached-compressions)) + + (cached-bytes-by-compression + (database-fold-cached-narinfo-files + database + (lambda (details result) + (let ((compression + (assq-ref details 'compression)) + (filename + (store-path-base + (assq-ref details 'store-path)))) + + (let ((files-hash + (assq-ref files-by-compression compression))) + (if (hash-ref files-hash filename) + (begin + (hash-remove! files-hash filename) + + (metric-increment nar-cache-files + #:label-values + `((compression . ,compression))) + + `((,compression . ,(+ (assq-ref details 'size) + (or (assq-ref result compression) + 0))) + ,@(alist-delete compression result))) + + ;; Database entry, but file missing + (begin + (set! database-entries-missing-files + (cons details + database-entries-missing-files)) + result))))) + (map + (lambda (compression) + (cons compression 0)) + (map car enabled-cached-compressions))))) + + ;; Delete cached files with no database entries + (for-each + (match-lambda + ((compression . hash-table) + (let ((count (hash-count (const #t) + hash-table))) + (unless (= 0 count) + (let ((directory + (assq-ref + (assq-ref enabled-cached-compressions compression) + 'directory))) + (simple-format #t "deleting ~A cached files from ~A\n" + count + directory) + (hash-for-each + (lambda (filename _) + (delete-file (string-append directory "/" filename))) + hash-table)))))) + files-by-compression) + + ;; Delete database entries where the file is missing + (let ((count (length database-entries-missing-files))) + (unless (= 0 count) + (simple-format + #t + "deleting ~A cached_narinfo_files entries due to missing files\n" + count) + + (for-each + (lambda (details) + (database-remove-cached-narinfo-file database + (assq-ref details 'narinfo-id) + (symbol->string + (assq-ref details 'compression)))) + database-entries-missing-files))) + + cached-bytes-by-compression)) + +;; This fiber manages metadata around cached compressions, and +;; delegates tasks to the thread pool to generate newly compressed +;; nars +(define* (start-cached-compression-management-fiber + database + metrics-registry + nar-source + enabled-cached-compressions + cached-compression-min-uses + #:key (cached-compression-workers 2) + scheduler) + + (define nar-cache-bytes-metric + (make-gauge-metric metrics-registry + "nar_cache_size_bytes" + #:labels '(compression))) + + (define nar-cache-files + (make-gauge-metric metrics-registry + "nar_cache_files" + #:labels '(compression))) + + (define channel + (make-channel)) + + (let ((process-job + count-jobs + count-threads + list-jobs + (create-work-queue cached-compression-workers + (lambda (thunk) + (thunk)) + #:name "cached compression"))) + + (define (consider-narinfo cached-bytes-by-compression + usage-hash-table + narinfo-id) + (let* ((existing-cached-files + ;; This is important both to avoid trying to create + ;; files twice, but also in the case where additional + ;; compressions are enabled and more files need to be + ;; generated + (database-select-cached-narinfo-files-by-narinfo-id + database + narinfo-id)) + (existing-compressions + (map (lambda (details) + (assq-ref details 'compression)) + existing-cached-files)) + (missing-compressions + (lset-difference eq? + (map car enabled-cached-compressions) + existing-compressions)) + (narinfo-files + (database-select-narinfo-files-by-narinfo-id + database + narinfo-id)) + + (compress-file? + (let ((url (assq-ref (first narinfo-files) 'url))) + ;; TODO: Maybe this should be configurable? + (not (compressed-file? url))))) + + (when (and compress-file? + (not (null? missing-compressions))) + (let ((new-count + (let ((val (+ 1 + (or (hash-ref usage-hash-table narinfo-id) + 0)))) + (hash-set! usage-hash-table + narinfo-id + val) + val))) + + (when (and (> new-count + cached-compression-min-uses)) + (let* ((narinfo-details + (database-select-narinfo database narinfo-id)) + (nar-size + (assq-ref narinfo-details 'nar-size)) + (compressions-with-space + (filter + (lambda (compression) + (let ((directory-max-size + (assq-ref + (assq-ref enabled-cached-compressions + compression) + 'directory-max-size)) + (current-directory-size + (assq-ref cached-bytes-by-compression + compression))) + + (unless current-directory-size + (log-msg 'ERROR "current-directory-size unset: " + current-directory-size)) + (unless nar-size + (log-msg 'ERROR "nar-size unset: " + nar-size)) + + (if directory-max-size + (< (+ current-directory-size + ;; Assume the commpressed nar could be + ;; as big as the uncompressed nar + nar-size) + directory-max-size) + #t))) + missing-compressions))) + (for-each + (lambda (compression) + (spawn-fiber + (lambda () + (process-job + (lambda () + (let ((new-bytes + (make-compressed-nar + narinfo-files + nar-source + enabled-cached-compressions + compression + #:level (assq-ref + (assq-ref enabled-cached-compressions + compression) + 'level)))) + (put-message channel + (list 'cached-narinfo-added + narinfo-id + compression + new-bytes + #f)))))))) + compressions-with-space))))))) + + (spawn-fiber + (lambda () + (let ((initial-cached-bytes-by-compression + (perform-cached-compression-startup + database + enabled-cached-compressions + nar-cache-files)) + (nar-cached-compression-usage-hash-table + (make-hash-table 65536))) + + (for-each + (match-lambda + ((compression . bytes) + (metric-set + nar-cache-bytes-metric + bytes + #:label-values `((compression . ,compression))))) + initial-cached-bytes-by-compression) + + (let loop ((cached-bytes-by-compression + initial-cached-bytes-by-compression)) + (match (get-message channel) + (('narinfo-id . narinfo-id) + (with-exception-handler + (lambda (exn) + (log-msg 'ERROR "exception considering narinfo (" + narinfo-id "): " exn) + #f) + (lambda () + (consider-narinfo cached-bytes-by-compression + nar-cached-compression-usage-hash-table + narinfo-id)) + #:unwind? #t) + (loop cached-bytes-by-compression)) + + (((and (or 'cached-narinfo-added 'cached-narinfo-removed) + action) + narinfo-id compression size reply) + (let ((updated-bytes + ((if (eq? action 'cached-narinfo-added) + + + -) + (or (assq-ref cached-bytes-by-compression + compression) + 0) + size))) + + (metric-set + nar-cache-bytes-metric + updated-bytes + #:label-values `((compression . ,compression))) + + ((if (eq? action 'cached-narinfo-added) + metric-increment + metric-decrement) + nar-cache-files + #:label-values `((compression . ,compression))) + + ;; Use an explicit transaction as it handles the + ;; database being busy, + (database-call-with-transaction + database + (lambda _ + (if (eq? action 'cached-narinfo-added) + (database-insert-cached-narinfo-file + database + narinfo-id + size + compression) + (let ((cached-narinfo-details + (database-select-cached-narinfo-file-by-narinfo-id-and-compression + database + narinfo-id + compression))) + + ;; It might not have been scheduled for + ;; removal, but remove any schedule that + ;; exists + (let ((schedule-removed? + (database-delete-scheduled-cached-narinfo-removal + database + (assq-ref cached-narinfo-details 'id)))) + (when schedule-removed? + (metric-decrement + (metrics-registry-fetch-metric + metrics-registry + "database_scheduled_cached_narinfo_removal_total")))) + + ;; Remove all the database entries first, as + ;; that'll stop these files appearing in narinfos + (database-remove-cached-narinfo-file + database + narinfo-id + (symbol->string compression)))))) + + (hash-remove! nar-cached-compression-usage-hash-table + narinfo-id) + + (when reply + (put-message reply #t)) + + (loop (alist-cons + compression + updated-bytes + (alist-delete compression + cached-bytes-by-compression))))))))) + scheduler) + + channel)) + +;; Periodically check for nars that haven't been accessed in some time +;; and schedule them for removal +(define (start-cached-compression-schedule-removal-fiber + database + metrics-registry + cached-compression-management-channel + enabled-cached-compressions + cached-compression-removal-fiber-wakeup-channel + base-ttl) + + (define (files-to-schedule-for-removal compression-details) + (let* ((directory (assq-ref compression-details 'directory)) + (unused-removal-duration + (assq-ref compression-details 'unused-removal-duration)) + (atime-threshold + (time-second + (subtract-duration (current-time) + unused-removal-duration)))) + (scandir + directory + (lambda (filename) + (and + (< (stat:atime (stat (string-append directory "/" filename))) + atime-threshold) + (not (member filename '("." "..")))))))) + + (define (schedule-removal compression compression-details) + (let ((files + (let ((files + (with-time-logging "files-to-schedule-for-removal" + (files-to-schedule-for-removal compression-details)))) + (log-msg 'INFO "cached-compression-schedule-removal-fiber " + "looking at " (length files) " files") + files)) + (count-metric + (metrics-registry-fetch-metric + metrics-registry + "database_scheduled_cached_narinfo_removal_total"))) + + (with-time-logging "inserting scheduled-cached-narinfo-removals" + (for-each + (lambda (file) + (let* ((cached-narinfo-file-details + (database-select-cached-narinfo-file-by-hash + database + (string-take file 32) ; hash part + compression)) + (existing-scheduled-removal + (database-select-scheduled-cached-narinfo-removal + database + (assq-ref cached-narinfo-file-details 'id)))) + (unless existing-scheduled-removal + (let ((removal-time + ;; The earliest this can be removed is the current + ;; time, plus the TTL + (add-duration + (current-time) + (make-time time-duration + 0 + (or (assq-ref compression-details 'ttl) + base-ttl))))) + (database-insert-scheduled-cached-narinfo-removal + database + (assq-ref cached-narinfo-file-details 'id) + removal-time) + + (metric-increment count-metric))))) + files)) + + ;; Wake the cached compression removal fiber in case one of + ;; the new scheduled removals is before it's scheduled to wake + ;; up + (put-message cached-compression-removal-fiber-wakeup-channel + #t))) + + (spawn-fiber + (lambda () + (let ((sleep-duration + (max + (* 60 60 6) ; Maybe this be confirgurable + (apply min + (map (lambda (compression-details) + (/ (time-second + (assq-ref compression-details + 'unused-removal-duration)) + 4)) + enabled-cached-compressions)))) + (start-count + (database-count-scheduled-cached-narinfo-removal + database))) + + (metric-set + (or (metrics-registry-fetch-metric + metrics-registry + "database_scheduled_cached_narinfo_removal_total") + (make-gauge-metric + metrics-registry + "database_scheduled_cached_narinfo_removal_total")) + start-count) + + (while #t + (log-msg 'INFO "cached-compression-schedule-removal-fiber starting pass") + + (for-each + (match-lambda + ((compression . details) + (with-exception-handler + (lambda (exn) + (log-msg 'ERROR "cached-compression-schedule-removal-fiber: " + "exception: " exn)) + (lambda () + (schedule-removal compression details)) + #:unwind? #t))) + enabled-cached-compressions) + + (log-msg 'INFO "cached-compression-schedule-removal-fiber sleeping for " + sleep-duration) + (sleep sleep-duration)))))) + +;; Process the scheduled removals for cached nars +(define (start-cached-compression-removal-fiber + database + cached-compression-management-channel + enabled-cached-compressions) + + (define wakeup-channel + (make-channel)) + + (define (make-pass) + (log-msg 'INFO "cached-compression-removal-fiber starting pass") + + (let ((scheduled-cached-narinfo-removal + (database-select-oldest-scheduled-cached-narinfo-removal + database))) + + (log-msg 'INFO "scheduled removal: " scheduled-cached-narinfo-removal) + + (if scheduled-cached-narinfo-removal + (if (time<=? (assq-ref scheduled-cached-narinfo-removal + 'scheduled-removal-time) + (current-time)) + (let ((id (assq-ref scheduled-cached-narinfo-removal 'id)) + (narinfo-id (assq-ref scheduled-cached-narinfo-removal + 'narinfo-id)) + (compression (assq-ref scheduled-cached-narinfo-removal + 'compression)) + (size (assq-ref scheduled-cached-narinfo-removal + 'size)) + (store-path (assq-ref scheduled-cached-narinfo-removal + 'store-path))) + (let ((reply (make-channel))) + (put-message + cached-compression-management-channel + (list 'cached-narinfo-removed + narinfo-id + compression + size + reply)) + + ;; Wait for the management fiber to delete the + ;; database entry before removing the file. + (get-message reply)) + + (let ((directory + (assq-ref (assq-ref enabled-cached-compressions + compression) + 'directory))) + (let ((filename + (string-append + directory "/" + (basename store-path)))) + (log-msg 'INFO "deleting " filename) + (delete-file filename)))) + + (let ((duration + (time-difference + (assq-ref scheduled-cached-narinfo-removal + 'scheduled-removal-time) + (current-time)))) + (perform-operation + (choice-operation + (sleep-operation (max 1 (+ 1 (time-second duration)))) + (get-operation wakeup-channel))))) + + ;; Sleep until woken + (get-message wakeup-channel)))) + + (spawn-fiber + (lambda () + (while #t + (with-exception-handler + (lambda (exn) + (log-msg 'ERROR "exception in cached-compression-removal-fiber: " + exn)) + (lambda () + (with-throw-handler #t + make-pass + (lambda _ + (backtrace)))) + #:unwind? #t)))) + + wakeup-channel) + +(define* (make-compressed-nar narinfo-files + nar-source + enabled-cached-compressions + target-compression + #:key level) + (define cached-compression-details + (assq-ref enabled-cached-compressions target-compression)) + + (log-msg 'INFO "making " target-compression " for " + (uri-decode + (basename + (assq-ref (first narinfo-files) 'url)))) + + (let* ((source-narinfo-file + ;; There's no specific logic to this, it should be possible + ;; to use any file + (first narinfo-files)) + (source-filename + (cond + ((string-prefix? "http" nar-source) + (let* ((output-port (mkstemp "/tmp/nar-herder-source-nar-XXXXXX")) + (filename + (port-filename output-port)) + (uri + (string->uri + (string-append nar-source + (assq-ref source-narinfo-file 'url))))) + + (log-msg 'DEBUG "downloading " (uri->string uri)) + (with-exception-handler + (lambda (exn) + (close-port output-port) + (delete-file filename) + (raise-exception exn)) + (lambda () + (with-port-timeouts + (lambda () + (call-with-values + (lambda () + (let ((port + socket + (open-socket-for-uri* uri))) + (http-get uri + #:port port + #:decode-body? #f + #:streaming? #t))) + (lambda (response body) + (unless (= (response-code response) + 200) + (error "unknown response code" + (response-code response))) + + (dump-port body output-port))) + (close-port output-port)) + #:timeout 30)) + #:unwind? #t) + + filename)) + ((string-prefix? "/" nar-source) + (string-append + ;; If it's a filename, then it's the canonical path to + ;; the storage directory + nar-source + (uri-decode (assq-ref source-narinfo-file 'url)))) + (else + (error "unknown nar source"))))) + + (let* ((dest-directory + (assq-ref cached-compression-details + 'directory)) + (dest-filename + (string-append + dest-directory + "/" + (uri-decode + (basename + (assq-ref source-narinfo-file 'url))))) + (tmp-dest-filename + (string-append dest-filename ".tmp"))) + + (when (file-exists? tmp-dest-filename) + (delete-file tmp-dest-filename)) + (when (file-exists? dest-filename) + (delete-file dest-filename)) + + (mkdir-p dest-directory) + + (call-with-input-file + source-filename + (lambda (source-port) + (call-with-decompressed-port + (string->symbol + (assq-ref source-narinfo-file + 'compression)) + source-port + (lambda (decompressed-source-port) + (let ((call-with-compressed-output-port* + (match target-compression + ('gzip + (@ (zlib) call-with-gzip-output-port)) + ('lzip + (@ (lzlib) call-with-lzip-output-port)) + ('zstd + (@ (zstd) call-with-zstd-output-port)) + ('none + (lambda (port proc) + (proc port)))))) + (apply + call-with-compressed-output-port* + (open-output-file tmp-dest-filename) + (lambda (compressed-port) + (dump-port decompressed-source-port + compressed-port)) + (if level + `(#:level ,level) + '()))))))) + (rename-file + tmp-dest-filename + dest-filename) + + (when (string-prefix? "http" nar-source) + (log-msg 'DEBUG "deleting temporary file " source-filename) + (delete-file source-filename)) + + (let ((bytes + (stat:size (stat dest-filename)))) + (log-msg 'INFO "created " dest-filename) + + bytes)))) diff --git a/nar-herder/database.scm b/nar-herder/database.scm index 61f5bb1..ded7c2c 100644 --- a/nar-herder/database.scm +++ b/nar-herder/database.scm @@ -28,10 +28,12 @@ #:use-module (sqlite3) #:use-module (fibers) #:use-module (prometheus) + #:use-module (guix store) #:use-module (guix narinfo) #:use-module (guix derivations) #:use-module (nar-herder utils) #:export (setup-database + update-database-metrics! database-optimize database-spawn-fibers @@ -42,8 +44,11 @@ database-insert-narinfo database-remove-narinfo + database-select-narinfo + database-select-narinfo-by-hash database-select-narinfo-contents-by-hash + database-count-recent-changes database-select-recent-changes database-select-latest-recent-change-datetime database-get-recent-changes-id-for-deletion @@ -51,8 +56,25 @@ database-select-narinfo-for-file database-select-narinfo-files - - database-map-all-narinfo-files)) + database-select-narinfo-files-by-narinfo-id + + database-fold-all-narinfo-files + database-map-all-narinfo-files + database-count-narinfo-files + + database-insert-cached-narinfo-file + database-select-cached-narinfo-file-by-hash + database-select-cached-narinfo-file-by-narinfo-id-and-compression + database-select-cached-narinfo-files-by-narinfo-id + database-fold-cached-narinfo-files + database-remove-cached-narinfo-file + + database-select-scheduled-narinfo-removal + database-select-scheduled-cached-narinfo-removal + database-delete-scheduled-cached-narinfo-removal + database-select-oldest-scheduled-cached-narinfo-removal + database-count-scheduled-cached-narinfo-removal + database-insert-scheduled-cached-narinfo-removal)) (define-record-type <database> (make-database database-file reader-thread-channel writer-thread-channel @@ -85,7 +107,8 @@ CREATE TABLE narinfos ( nar_size INTEGER NOT NULL, deriver TEXT, system TEXT, - contents NOT NULL + contents NOT NULL, + added_at TEXT ); CREATE UNIQUE INDEX narinfos_store_hash ON narinfos (substr(store_path, 12, 32)); @@ -97,6 +120,8 @@ CREATE TABLE narinfo_files ( url TEXT NOT NULL ); +CREATE INDEX narinfo_files_narinfo_id ON narinfo_files (narinfo_id); + CREATE TABLE narinfo_references ( narinfo_id INTEGER NOT NULL REFERENCES narinfos (id), store_path TEXT NOT NULL @@ -120,28 +145,124 @@ CREATE TABLE recent_changes ( datetime TEXT NOT NULL, change TEXT NOT NULl, data TEXT NOT NULL +); + +CREATE TABLE cached_narinfo_files ( + id INTEGER PRIMARY KEY ASC, + narinfo_id INTEGER NOT NULL REFERENCES narinfos (id), + size INTEGER NOT NULL, + compression TEXT +); + +CREATE INDEX cached_narinfo_files_narinfo_id + ON cached_narinfo_files (narinfo_id); + +CREATE TABLE scheduled_narinfo_removal ( + narinfo_id INTEGER PRIMARY KEY ASC REFERENCES narinfos (id), + removal_datetime TEXT NOT NULL +); + +CREATE TABLE scheduled_cached_narinfo_removal ( + cached_narinfo_file_id INTEGER PRIMARY KEY ASC REFERENCES cached_narinfo_files (id), + removal_datetime TEXT NOT NULL );") (sqlite-exec db schema)) -(define (update-schema db) +(define (table-exists? db name) (let ((statement (sqlite-prepare db " -SELECT name FROM sqlite_master WHERE type = 'table' AND name = :name"))) +SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = :name"))) (sqlite-bind-arguments statement - #:name "narinfos") + #:name name) + + (let ((result + (match (sqlite-step statement) + (#f #f) + (#(1) #t)))) + (sqlite-finalize statement) - (match (sqlite-step statement) - (#f (perform-initial-database-setup db)) - (_ #f)) + result))) + +(define (column-exists? db table-name column-name) + (let ((statement + (sqlite-prepare + db + (simple-format #f "PRAGMA table_info(~A);" table-name)))) + + (let ((columns + (sqlite-map + (lambda (row) + (vector-ref row 1)) + statement))) + (sqlite-finalize statement) + + (member column-name columns)))) + +(define (update-schema db) + (unless (table-exists? db "narinfos") + (perform-initial-database-setup db)) + + (unless (table-exists? db "cached_narinfo_files") + (sqlite-exec + db + " +CREATE TABLE cached_narinfo_files ( + id INTEGER PRIMARY KEY ASC, + narinfo_id INTEGER NOT NULL REFERENCES narinfos (id), + size INTEGER NOT NULL, + compression TEXT NOT NULL +); - (sqlite-finalize statement))) +CREATE INDEX cached_narinfo_files_narinfo_id + ON cached_narinfo_files (narinfo_id);")) + + (unless (column-exists? db "narinfos" "added_at") + (sqlite-exec + db + "ALTER TABLE narinfos ADD COLUMN added_at TEXT;")) + + (unless (table-exists? db "scheduled_narinfo_removal") + (sqlite-exec + db + " +CREATE TABLE scheduled_narinfo_removal ( + narinfo_id INTEGER PRIMARY KEY ASC REFERENCES narinfos (id), + removal_datetime TEXT NOT NULL +);")) + + (unless (table-exists? db "scheduled_cached_narinfo_removal") + (sqlite-exec + db + " +CREATE TABLE scheduled_cached_narinfo_removal ( + cached_narinfo_file_id INTEGER PRIMARY KEY ASC REFERENCES cached_narinfo_files (id), + removal_datetime TEXT NOT NULL +);")) + + (sqlite-exec + db + "CREATE INDEX IF NOT EXISTS narinfo_tags_narinfo_id + ON narinfo_tags (narinfo_id);") + + (sqlite-exec + db + "CREATE INDEX IF NOT EXISTS narinfo_references_narinfo_id + ON narinfo_references (narinfo_id);") + + (sqlite-exec + db + "CREATE INDEX IF NOT EXISTS narinfo_files_narinfo_id + ON narinfo_files (narinfo_id);")) + +(define* (setup-database database-file metrics-registry + #:key (reader-threads 1)) + (define mmap-size #f) -(define (setup-database database-file metrics-registry) (let ((db (db-open database-file))) (sqlite-exec db "PRAGMA journal_mode=WAL;") (sqlite-exec db "PRAGMA optimize;") @@ -149,44 +270,77 @@ SELECT name FROM sqlite_master WHERE type = 'table' AND name = :name"))) (update-schema db) + ;; (let ((requested-mmap-bytes 2147418112) + ;; (statement + ;; (sqlite-prepare + ;; db + ;; (simple-format #f "PRAGMA mmap_size=~A;" + ;; 2147418112)))) + ;; (match (sqlite-step statement) + ;; (#(result-mmap-size) + ;; (sqlite-finalize statement) + ;; (set! mmap-size + ;; result-mmap-size)))) + (sqlite-close db)) (let ((reader-thread-channel - (make-worker-thread-channel + (make-worker-thread-set (lambda () (let ((db (db-open database-file #:write? #f))) (sqlite-exec db "PRAGMA busy_timeout = 5000;") + (when mmap-size + (sqlite-exec + db + (simple-format #f "PRAGMA mmap_size=~A;" + (number->string mmap-size)))) (list db))) #:destructor (lambda (db) (sqlite-close db)) #:lifetime 50000 + #:name "db r" ;; Use a minimum of 2 and a maximum of 8 threads - #:parallelism - (min (max (current-processor-count) - 2) - 64) + #:parallelism reader-threads #:delay-logger (let ((delay-metric (make-histogram-metric metrics-registry "database_read_delay_seconds"))) - (lambda (seconds-delayed) + (lambda (seconds-delayed proc) (metric-observe delay-metric seconds-delayed) (when (> seconds-delayed 1) - (format - (current-error-port) - "warning: database read delayed by ~1,2f seconds~%" - seconds-delayed)))))) + (display + (format + #f + "warning: database read (~a) delayed by ~1,2f seconds~%" + proc + seconds-delayed) + (current-error-port))))) + #:duration-logger + (lambda (duration proc) + (when (> duration 5) + (display + (format + #f + "warning: database read took ~1,2f seconds (~a)~%" + duration + proc) + (current-error-port)))))) (writer-thread-channel - (make-worker-thread-channel + (make-worker-thread-set (lambda () (let ((db (db-open database-file))) (sqlite-exec db "PRAGMA busy_timeout = 5000;") (sqlite-exec db "PRAGMA foreign_keys = ON;") + (when mmap-size + (sqlite-exec + db + (simple-format #f "PRAGMA mmap_size=~A;" + (number->string mmap-size)))) (list db))) #:destructor (lambda (db) @@ -195,6 +349,7 @@ SELECT name FROM sqlite_master WHERE type = 'table' AND name = :name"))) (sqlite-close db)) #:lifetime 500 + #:name "db w" ;; SQLite doesn't support parallel writes #:parallelism 1 @@ -202,19 +357,56 @@ SELECT name FROM sqlite_master WHERE type = 'table' AND name = :name"))) (make-histogram-metric metrics-registry "database_write_delay_seconds"))) - (lambda (seconds-delayed) + (lambda (seconds-delayed proc) (metric-observe delay-metric seconds-delayed) (when (> seconds-delayed 1) - (format - (current-error-port) - "warning: database write delayed by ~1,2f seconds~%" - seconds-delayed))))))) + (display + (format + #f + "warning: database write (~a) delayed by ~1,2f seconds~%" + proc + seconds-delayed) + (current-error-port))))) + #:duration-logger + (lambda (duration proc) + (when (> duration 5) + (display + (format + #f + "warning: database write took ~1,2f seconds (~a)~%" + duration + proc) + (current-error-port))))))) (make-database database-file reader-thread-channel writer-thread-channel metrics-registry))) +(define (update-database-metrics! database) + (let* ((db-filename (database-file database)) + (db-wal-filename + (string-append db-filename "-wal")) + + (registry (database-metrics-registry database)) + (db-bytes + (or (metrics-registry-fetch-metric registry + "database_bytes") + (make-gauge-metric + registry "database_bytes" + #:docstring "Size of the SQLite database file"))) + (db-wal-bytes + (or (metrics-registry-fetch-metric registry + "database_wal_bytes") + (make-gauge-metric + registry "database_wal_bytes" + #:docstring "Size of the SQLite Write Ahead Log file")))) + + + (metric-set db-bytes (stat:size (stat db-filename))) + (metric-set db-wal-bytes (stat:size (stat db-wal-filename)))) + #t) + (define (db-optimize db db-filename) (define (wal-size) (let ((db-wal-filename @@ -267,43 +459,110 @@ PRAGMA optimize;"))) (string-append "database_" thing "_duration_seconds")) (if registry - (let* ((metric - (or (metrics-registry-fetch-metric registry metric-name) - (make-histogram-metric registry - metric-name))) - (start-time (get-internal-real-time))) - (let ((result (thunk))) - (metric-observe metric - (/ (- (get-internal-real-time) start-time) - internal-time-units-per-second)) - result)) + (call-with-duration-metric registry + metric-name + thunk) (thunk))) (define %current-transaction-proc (make-parameter #f)) (define* (database-call-with-transaction database proc - #:key - readonly?) + #:key + readonly? + (immediate? (not readonly?))) (define (run-proc-within-transaction db) - (if (%current-transaction-proc) - (proc db) ; already in transaction - (begin - (sqlite-exec db "BEGIN TRANSACTION;") - (with-exception-handler - (lambda (exn) - (simple-format (current-error-port) - "error: sqlite rolling back transaction\n") - (sqlite-exec db "ROLLBACK TRANSACTION;") - (raise-exception exn)) + (define (attempt-begin) + (with-exception-handler + (lambda (exn) + (match (exception-args exn) + (('sqlite-exec 5 msg) + (simple-format + (current-error-port) + "warning: issue starting transaction (code: 5, proc: ~A): ~A\n" + proc msg) + #f) + (_ + (simple-format (current-error-port) + "exception starting transaction: ~A\n" exn) + (raise-exception exn)))) + (lambda () + (sqlite-exec db (if immediate? + "BEGIN IMMEDIATE TRANSACTION;" + "BEGIN TRANSACTION;")) + #t) + #:unwind? #t)) + + (define (attempt-commit) + (with-exception-handler + (lambda (exn) + (match (exception-args exn) + (('sqlite-exec 5 msg) + (simple-format + (current-error-port) + "warning: attempt commit (code: 5, proc: ~A): ~A\n" + proc msg) + #f) + (_ + (simple-format (current-error-port) + "exception committing transaction: ~A\n" exn) + (raise-exception exn)))) + (lambda () + (sqlite-exec db "COMMIT TRANSACTION;") + #t) + #:unwind? #t)) + + (if (attempt-begin) + (call-with-values (lambda () - (call-with-values - (lambda () - (parameterize ((%current-transaction-proc proc)) - (proc db))) - (lambda vals - (sqlite-exec db "COMMIT TRANSACTION;") - (apply values vals)))))))) + (with-exception-handler + (lambda (exn) + (simple-format (current-error-port) + "error: sqlite rolling back transaction (~A)\n" + exn) + (sqlite-exec db "ROLLBACK TRANSACTION;") + (raise-exception exn)) + (lambda () + (parameterize ((%current-transaction-proc proc)) + (proc-with-duration-timing db))) + #:unwind? #t)) + (lambda vals + (let loop ((success? (attempt-commit))) + (if success? + (apply values vals) + (loop (attempt-commit)))))) + + ;; Database is busy, so retry + (run-proc-within-transaction db))) + + (define (proc-with-duration-timing db) + (let ((start-time (get-internal-real-time))) + (call-with-values + (lambda () + (with-throw-handler #t + (lambda () + (proc db)) + (lambda (key . args) + (simple-format + (current-error-port) + "exception in transaction: ~A: ~A\n" + key args) + (backtrace)))) + (lambda vals + (let ((duration-seconds + (/ (- (get-internal-real-time) start-time) + internal-time-units-per-second))) + (when (and (not readonly?) + (> duration-seconds 2)) + (display + (format + #f + "warning: ~a:\n took ~4f seconds in transaction\n" + proc + duration-seconds) + (current-error-port))) + + (cons duration-seconds vals)))))) (match (call-with-worker-thread ((if readonly? @@ -311,25 +570,9 @@ PRAGMA optimize;"))) database-writer-thread-channel) database) (lambda (db) - (let ((start-time (get-internal-real-time))) - (call-with-values - (lambda () - (run-proc-within-transaction db)) - (lambda vals - (let ((duration-seconds - (/ (- (get-internal-real-time) start-time) - internal-time-units-per-second))) - (when (and (not readonly?) - (> duration-seconds 2)) - (display - (format - #f - "warning: ~a:\n took ~4f seconds in transaction\n" - proc - duration-seconds) - (current-error-port))) - - (cons duration-seconds vals))))))) + (if (%current-transaction-proc) + (proc-with-duration-timing db) ; already in transaction + (run-proc-within-transaction db)))) ((duration vals ...) (apply values vals)))) @@ -417,9 +660,9 @@ SELECT id FROM tags WHERE key = :key AND value = :value" db " INSERT INTO narinfos ( - store_path, nar_hash, nar_size, deriver, system, contents + store_path, nar_hash, nar_size, deriver, system, contents, added_at ) VALUES ( - :store_path, :nar_hash, :nar_size, :deriver, :system, :contents + :store_path, :nar_hash, :nar_size, :deriver, :system, :contents, :added_at )" #:cache? #t))) (sqlite-bind-arguments @@ -429,7 +672,8 @@ INSERT INTO narinfos ( #:nar_size (narinfo-size narinfo) #:deriver (narinfo-deriver narinfo) #:system (narinfo-system narinfo) - #:contents (narinfo-contents narinfo)) + #:contents (narinfo-contents narinfo) + #:added_at (date->string (current-date) "~1 ~3")) (sqlite-step statement) (sqlite-reset statement) @@ -563,23 +807,6 @@ INSERT INTO narinfo_tags (narinfo_id, tag_id) VALUES (:narinfo_id, :tag_id)" (define* (database-remove-narinfo database store-path #:key change-datetime) - (define (store-path->narinfo-id db) - (let ((statement - (sqlite-prepare - db - " -SELECT id FROM narinfos WHERE store_path = :store_path" - #:cache? #t))) - - (sqlite-bind-arguments - statement - #:store_path store-path) - - (let ((result (vector-ref (sqlite-step statement) 0))) - (sqlite-reset statement) - - result))) - (define (remove-narinfo-record db id) (let ((statement (sqlite-prepare @@ -677,18 +904,90 @@ DELETE FROM narinfo_tags WHERE narinfo_id = :narinfo_id" (database-call-with-transaction database (lambda (db) - (let ((narinfo-id (store-path->narinfo-id db))) - (if change-datetime - (insert-change-with-datetime db store-path - change-datetime) - (insert-change db store-path)) + (let ((narinfo-details + (database-select-narinfo-by-hash + database + (store-path-hash-part store-path)))) + (if narinfo-details + (let ((narinfo-id (assq-ref narinfo-details + 'id))) + (if change-datetime + (insert-change-with-datetime db store-path + change-datetime) + (insert-change db store-path)) + + (remove-narinfo-files db narinfo-id) + (remove-narinfo-references db narinfo-id) + (remove-tags db narinfo-id) + + (remove-narinfo-record db narinfo-id) + #t) + #f))))) + +(define (database-select-narinfo database id) + (call-with-time-tracking + database + "select_narinfo" + (lambda () + (call-with-worker-thread + (database-reader-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT store_path, nar_hash, nar_size, deriver, system +FROM narinfos +WHERE id = :id" + #:cache? #t))) + (sqlite-bind-arguments + statement + #:id id) - (remove-narinfo-files db narinfo-id) - (remove-narinfo-references db narinfo-id) - (remove-tags db narinfo-id) - (remove-narinfo-record db narinfo-id) + (match (let ((result (sqlite-step statement))) + (sqlite-reset statement) + result) + (#(store_path nar_hash nar_size deriver system) + `((store-path . ,store_path) + (nar-hash . ,nar_hash) + (nar-size . ,nar_size) + (deriver . ,deriver) + (system . ,system))) + (_ + #f)))))))) + +(define (database-select-narinfo-by-hash database hash) + (call-with-time-tracking + database + "select_narinfo_by_hash" + (lambda () + (call-with-worker-thread + (database-reader-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT id, store_path, nar_hash, nar_size, deriver, system +FROM narinfos +WHERE substr(store_path, 12, 32) = :hash" + #:cache? #t))) + (sqlite-bind-arguments + statement + #:hash hash) - #t)))) + (match (let ((result (sqlite-step statement))) + (sqlite-reset statement) + result) + (#(id store_path nar_hash nar_size deriver system) + `((id . ,id) + (store-path . ,store_path) + (nar-hash . ,nar_hash) + (nar-size . ,nar_size) + (deriver . ,deriver) + (system . ,system))) + (_ + #f)))))))) (define (database-select-narinfo-contents-by-hash database hash) (call-with-time-tracking @@ -702,7 +1001,7 @@ DELETE FROM narinfo_tags WHERE narinfo_id = :narinfo_id" (sqlite-prepare db " -SELECT contents FROM narinfos WHERE substr(store_path, 12, 32) = :hash" +SELECT id, contents FROM narinfos WHERE substr(store_path, 12, 32) = :hash" #:cache? #t))) (sqlite-bind-arguments statement @@ -711,8 +1010,27 @@ SELECT contents FROM narinfos WHERE substr(store_path, 12, 32) = :hash" (match (let ((result (sqlite-step statement))) (sqlite-reset statement) result) - (#(contents) contents) - (_ #f)))))))) + (#(id contents) + (values contents id)) + (_ + (values #f #f))))))))) + +(define (database-count-recent-changes database) + (call-with-worker-thread + (database-reader-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT COUNT(*) FROM recent_changes" + #:cache? #t))) + + (let ((result + (match (sqlite-step statement) + (#(count) count)))) + (sqlite-reset statement) + result))))) (define* (database-select-recent-changes database after-date #:key (limit 8192)) (call-with-worker-thread @@ -722,26 +1040,27 @@ SELECT contents FROM narinfos WHERE substr(store_path, 12, 32) = :hash" (sqlite-prepare db " -SELECT datetime, change, data FROM recent_changes WHERE datetime >= :datetime LIMIT :limit" +SELECT datetime, change, data +FROM recent_changes +WHERE datetime >= :datetime +ORDER BY datetime ASC +LIMIT :limit" #:cache? #t))) (sqlite-bind-arguments statement #:datetime after-date #:limit limit) - (let loop ((row (sqlite-step statement)) - (result '())) - (match row - (#(datetime change data) - (loop (sqlite-step statement) - (cons `((datetime . ,datetime) - (change . ,change) - (data . ,data)) - result))) - (#f - (sqlite-reset statement) - - (reverse result)))))))) + (let ((result + (sqlite-map + (match-lambda + (#(datetime change data) + `((datetime . ,datetime) + (change . ,change) + (data . ,data)))) + statement))) + (sqlite-reset statement) + result))))) (define (database-select-latest-recent-change-datetime database) (call-with-worker-thread @@ -869,7 +1188,42 @@ WHERE substr(narinfos.store_path, 12, 32) = :hash" result))))))) -(define (database-map-all-narinfo-files database proc) +(define (database-select-narinfo-files-by-narinfo-id database narinfo-id) + (call-with-time-tracking + database + "select_narinfo_files_by_narinfo_id" + (lambda () + (call-with-worker-thread + (database-reader-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT narinfo_files.size, narinfo_files.compression, narinfo_files.url +FROM narinfos +INNER JOIN narinfo_files + ON narinfos.id = narinfo_files.narinfo_id +WHERE narinfos.id = :narinfo_id" + #:cache? #t))) + + (sqlite-bind-arguments + statement + #:narinfo_id narinfo-id) + + (let ((result + (sqlite-map + (match-lambda + (#(size compression url) + `((size . ,size) + (compression . ,compression) + (url . ,url)))) + statement))) + (sqlite-reset statement) + + result))))))) + +(define (database-fold-all-narinfo-files database proc init) (call-with-worker-thread (database-reader-thread-channel database) (lambda (db) @@ -885,12 +1239,410 @@ FROM narinfo_files" (lambda (row result) (match row (#(size compression url) - (cons (proc `((size . ,size) - (compression . ,compression) - (url . ,url))) + (proc `((size . ,size) + (compression . ,compression) + (url . ,url)) + result)))) + init + statement))) + (sqlite-reset statement) + + result-list))))) + +(define (database-map-all-narinfo-files database proc) + (database-fold-all-narinfo-files + database + (lambda (nar-file result) + (cons (proc nar-file) + result)) + '())) + +(define (database-count-narinfo-files database) + (call-with-worker-thread + (database-reader-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT COUNT(*) FROM narinfo_files" + #:cache? #t))) + + (let ((result + (vector-ref (sqlite-step statement) + 0))) + (sqlite-reset statement) + + result))))) + +(define (database-insert-cached-narinfo-file database + narinfo-id + size + compression) + (call-with-worker-thread + (database-writer-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +INSERT INTO cached_narinfo_files ( + narinfo_id, size, compression +) VALUES ( + :narinfo_id, :size, :compression +)" + #:cache? #t))) + (sqlite-bind-arguments + statement + #:narinfo_id narinfo-id + #:size size + #:compression (symbol->string compression)) + + (sqlite-step statement) + (sqlite-reset statement) + + (last-insert-rowid db))))) + +(define (database-select-cached-narinfo-file-by-hash database + hash + compression) + (call-with-time-tracking + database + "select_cached_narinfo_file_by_hash" + (lambda () + (call-with-worker-thread + (database-reader-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT cached_narinfo_files.id, cached_narinfo_files.size +FROM narinfos +INNER JOIN cached_narinfo_files + ON cached_narinfo_files.narinfo_id = narinfos.id +WHERE substr(narinfos.store_path, 12, 32) = :hash + AND cached_narinfo_files.compression = :compression" + #:cache? #t))) + + (sqlite-bind-arguments + statement + #:hash hash + #:compression (symbol->string compression)) + + (let ((result + (match (sqlite-step statement) + (#(id size) + `((id . ,id) + (size . ,size))) + (#f #f)))) + (sqlite-reset statement) + + result))))))) + +(define (database-select-cached-narinfo-files-by-narinfo-id + database + narinfo-id) + (call-with-time-tracking + database + "select_cached_narinfo_file_by_narinfo_id" + (lambda () + (call-with-worker-thread + (database-reader-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT cached_narinfo_files.id, + narinfos.store_path, + cached_narinfo_files.size, + cached_narinfo_files.compression +FROM cached_narinfo_files +INNER JOIN narinfos + ON cached_narinfo_files.narinfo_id = narinfos.id +WHERE narinfo_id = :narinfo_id" + #:cache? #t))) + + (sqlite-bind-arguments + statement + #:narinfo_id narinfo-id) + + (let ((result + (sqlite-map + (match-lambda + (#(id store_path size compression) + `((id . ,id) + (store-path . ,store_path) + (size . ,size) + (compression . ,(string->symbol compression))))) + statement))) + (sqlite-reset statement) + + result))))))) + +(define (database-select-cached-narinfo-file-by-narinfo-id-and-compression + database + narinfo-id + compression) + (call-with-time-tracking + database + "select_cached_narinfo_file_by_narinfo_id_and_compression" + (lambda () + (call-with-worker-thread + (database-reader-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT cached_narinfo_files.id, + narinfos.store_path, + cached_narinfo_files.size, + cached_narinfo_files.compression +FROM cached_narinfo_files +INNER JOIN narinfos + ON cached_narinfo_files.narinfo_id = narinfos.id +WHERE narinfo_id = :narinfo_id + AND compression = :compression" + #:cache? #t))) + + (sqlite-bind-arguments + statement + #:narinfo_id narinfo-id + #:compression (symbol->string compression)) + + (let ((result + (match (sqlite-step statement) + (#(id store_path size compression) + `((id . ,id) + (store-path . ,store_path) + (size . ,size) + (compression . ,(string->symbol compression))))))) + (sqlite-reset statement) + + result))))))) + +(define (database-fold-cached-narinfo-files database + proc + init) + (call-with-worker-thread + (database-reader-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT store_path, size, compression, narinfo_id +FROM cached_narinfo_files +INNER JOIN narinfos + ON cached_narinfo_files.narinfo_id = narinfos.id" + #:cache? #t))) + (let ((result-list + (sqlite-fold + (lambda (row result) + (match row + (#(store_path size compression narinfo_id) + (proc `((size . ,size) + (compression . ,(string->symbol compression)) + (store-path . ,store_path) + (narinfo-id . ,narinfo_id)) result)))) - '() + init statement))) (sqlite-reset statement) result-list))))) + +(define (database-remove-cached-narinfo-file database narinfo-id compression) + (call-with-worker-thread + (database-writer-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +DELETE FROM cached_narinfo_files +WHERE narinfo_id = :narinfo_id + AND compression = :compression" + #:cache? #t))) + + (sqlite-bind-arguments + statement + #:narinfo_id narinfo-id + #:compression compression) + + (sqlite-step statement) + (sqlite-reset statement))))) + +(define (database-select-scheduled-narinfo-removal database narinfo-id) + (call-with-time-tracking + database + "select_scheduled_narinfo_removal" + (lambda () + (call-with-worker-thread + (database-reader-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT removal_datetime +FROM scheduled_narinfo_removal +WHERE narinfo_id = :narinfo_id" + #:cache? #t))) + (sqlite-bind-arguments + statement + #:narinfo_id narinfo-id) + + (let ((result + (match (sqlite-step statement) + (#(datetime) + (date->time-utc + (string->date + datetime + "~Y-~m-~d ~H:~M:~S"))) + (#f #f)))) + (sqlite-reset statement) + result))))))) + +(define (database-select-scheduled-cached-narinfo-removal database + cached-narinfo-file-id) + (call-with-time-tracking + database + "select_scheduled_narinfo_removal" + (lambda () + (call-with-worker-thread + (database-reader-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT removal_datetime +FROM scheduled_cached_narinfo_removal +WHERE cached_narinfo_file_id = :cached_narinfo_file_id" + #:cache? #t))) + (sqlite-bind-arguments + statement + #:cached_narinfo_file_id cached-narinfo-file-id) + + (let ((result + (match (sqlite-step statement) + (#(datetime) + (date->time-utc + (string->date + datetime + "~Y-~m-~d ~H:~M:~S"))) + (#f #f)))) + (sqlite-reset statement) + result))))))) + +(define (database-delete-scheduled-cached-narinfo-removal database + cached-narinfo-file-id) + (call-with-worker-thread + (database-writer-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +DELETE FROM scheduled_cached_narinfo_removal +WHERE cached_narinfo_file_id = :cached_narinfo_file_id +RETURNING 1" + #:cache? #t))) + (sqlite-bind-arguments + statement + #:cached_narinfo_file_id cached-narinfo-file-id) + + (let ((result (->bool (sqlite-step statement)))) + (sqlite-reset statement) + + result))))) + +(define (database-select-oldest-scheduled-cached-narinfo-removal database) + (call-with-worker-thread + (database-reader-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT cached_narinfo_files.id, + cached_narinfo_files.narinfo_id, + cached_narinfo_files.size, + cached_narinfo_files.compression, + narinfos.store_path, + scheduled_cached_narinfo_removal.removal_datetime +FROM scheduled_cached_narinfo_removal +INNER JOIN cached_narinfo_files + ON scheduled_cached_narinfo_removal.cached_narinfo_file_id = + cached_narinfo_files.id +INNER JOIN narinfos + ON cached_narinfo_files.narinfo_id = narinfos.id +ORDER BY scheduled_cached_narinfo_removal.removal_datetime DESC +LIMIT 1" + #:cache? #t))) + + (let ((result + (match (sqlite-step statement) + (#(id narinfo_id size compression store_path datetime) + `((id . ,id) + (narinfo-id . ,narinfo_id) + (size . ,size) + (compression . ,(string->symbol compression)) + (store-path . ,store_path) + (scheduled-removal-time . ,(date->time-utc + (string->date + datetime + "~Y-~m-~d ~H:~M:~S"))))) + (#f #f)))) + (sqlite-reset statement) + result))))) + +(define (database-count-scheduled-cached-narinfo-removal database) + (call-with-worker-thread + (database-reader-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +SELECT COUNT(*) FROM scheduled_cached_narinfo_removal" + #:cache? #t))) + + (let ((result + (vector-ref (sqlite-step statement) + 0))) + (sqlite-reset statement) + + result))))) + +(define (database-insert-scheduled-cached-narinfo-removal database + cached-narinfo-file-id + removal-datetime) + (call-with-worker-thread + (database-writer-thread-channel database) + (lambda (db) + (let ((statement + (sqlite-prepare + db + " +INSERT INTO scheduled_cached_narinfo_removal ( + cached_narinfo_file_id, removal_datetime +) VALUES ( + :cached_narinfo_file_id, :removal_datetime +)" + #:cache? #t))) + + (sqlite-bind-arguments + statement + #:cached_narinfo_file_id cached-narinfo-file-id + #:removal_datetime (date->string + (time-utc->date removal-datetime) + "~Y-~m-~d ~H:~M:~S")) + + (sqlite-step statement) + (sqlite-reset statement) + + #t)))) diff --git a/nar-herder/mirror.scm b/nar-herder/mirror.scm index 6b9f4f7..a784165 100644 --- a/nar-herder/mirror.scm +++ b/nar-herder/mirror.scm @@ -19,6 +19,7 @@ (define-module (nar-herder mirror) #:use-module (srfi srfi-1) #:use-module (srfi srfi-43) + #:use-module (srfi srfi-71) #:use-module (ice-9 match) #:use-module (ice-9 threads) #:use-module (ice-9 exceptions) @@ -29,22 +30,18 @@ #:use-module (prometheus) #:use-module (logging logger) #:use-module (json) + #:use-module (fibers) + #:use-module (fibers channels) #:use-module (guix narinfo) #:use-module ((guix store) #:select (store-path-hash-part)) #:use-module (nar-herder utils) #:use-module (nar-herder database) #:use-module (nar-herder storage) - #:export (start-fetch-changes-thread)) - -(define (start-fetch-changes-thread database storage-root - mirror metrics-registry) - (define nar-files-metric - (or (metrics-registry-fetch-metric metrics-registry - "nar_files_total") - (make-gauge-metric metrics-registry - "nar_files_total" - #:labels '(stored)))) + #:export (start-fetch-changes-fiber)) +(define (start-fetch-changes-fiber database metrics-registry + storage-root mirror + cached-compression-management-channel) (define (request-recent-changes) (define latest-recent-change (database-select-latest-recent-change-datetime database)) @@ -72,18 +69,25 @@ (lambda () (retry-on-error (lambda () - (http-get uri #:decode-body? #f)) + (log-msg 'INFO "querying for recent changes since " + latest-recent-change) + (with-port-timeouts + (lambda () + (let ((port + socket + (open-socket-for-uri* uri))) + (http-get uri + #:port port + #:streaming? #t))) + #:timeout 30)) #:times 3 #:delay 15)) (lambda (response body) (if (= (response-code response) 200) - (let* ((json-body (json-string->scm - (utf8->string body))) + (let* ((json-body (json->scm body)) (recent-changes (assoc-ref json-body "recent_changes"))) - (log-msg 'INFO "queried for recent changes since " - latest-recent-change) (log-msg 'INFO "got " (vector-length recent-changes) " changes") ;; Switch to symbol keys and standardise the key order @@ -118,46 +122,75 @@ (assq-ref change-details 'datetime)) - (let ((new-files-count (length (narinfo-uris narinfo)))) - (metric-increment nar-files-metric - #:by new-files-count - ;; TODO This should be - ;; checked, rather than - ;; assumed to be false - #:label-values '((stored . "false")))))) + (and=> (metrics-registry-fetch-metric metrics-registry + "nar_files_total") + (lambda (metric) + ;; Just update this metric if it + ;; exists, since if it does, it + ;; should be set to a value + (let ((new-files-count + (length (narinfo-uris narinfo)))) + (metric-increment + metric + #:by new-files-count + ;; TODO This should be + ;; checked, rather than + ;; assumed to be false + #:label-values '((stored . "false")))))))) ((string=? change "removal") (let ((store-path (assq-ref change-details 'data))) (log-msg 'INFO "processing removal change for " store-path " (" (assq-ref change-details 'datetime) ")") + (let* ((hash (store-path-hash-part store-path)) + (narinfo-details + (database-select-narinfo-by-hash + database + hash))) + (when storage-root (remove-nar-files-by-hash database storage-root metrics-registry - (store-path-hash-part store-path))) + hash)) + + (let ((cached-narinfo-files + (database-select-cached-narinfo-files-by-narinfo-id + database + (assq-ref narinfo-details 'id)))) + (for-each + (lambda (cached-narinfo-file-details) + ;; TODO Delete the file as well + + (let ((reply (make-channel))) + (put-message + cached-compression-management-channel + (list 'cached-narinfo-removed + (assq-ref narinfo-details 'id) + (assq-ref cached-narinfo-files 'compression) + (assq-ref cached-narinfo-files 'size) + reply)) + (get-message reply))) + cached-narinfo-files)) (database-remove-narinfo database store-path #:change-datetime (assq-ref change-details - 'datetime)))) + 'datetime))))) (else (error "unimplemented")))))) recent-changes)) (raise-exception (make-exception-with-message - (simple-format #f "unknown response: ~A\n code: ~A response: ~A" + (simple-format #f "unknown response: ~A code: ~A" (uri->string uri) - (response-code response) - (utf8->string body)))))))) + (response-code response)))))))) - (call-with-new-thread + (spawn-fiber (lambda () - ;; This will initialise the nar_files_total metric - (get-nar-files database storage-root metrics-registry) - (while #t (with-exception-handler (lambda (exn) @@ -165,4 +198,5 @@ request-recent-changes #:unwind? #t) + (log-msg 'DEBUG "finished requesting recent changes, sleeping") (sleep 60))))) diff --git a/nar-herder/recent-changes.scm b/nar-herder/recent-changes.scm index fee63f3..ccfff93 100644 --- a/nar-herder/recent-changes.scm +++ b/nar-herder/recent-changes.scm @@ -18,15 +18,25 @@ (define-module (nar-herder recent-changes) #:use-module (srfi srfi-1) + #:use-module (ice-9 match) #:use-module (ice-9 threads) + #:use-module (fibers) + #:use-module (fibers channels) + #:use-module (logging logger) + #:use-module (prometheus) + #:use-module (web uri) + #:use-module (guix narinfo) #:use-module (nar-herder database) - #:export (start-recent-change-removal-and-database-dump-thread)) + #:export (start-recent-change-removal-and-database-dump-fiber + start-recent-change-listener-fiber)) -(define (start-recent-change-removal-and-database-dump-thread database - database-dump-filename - check-interval - recent-changes-limit) +(define (start-recent-change-removal-and-database-dump-fiber database + metrics-registry + database-dump-filename + check-interval + recent-changes-limit) (define (update-database-dump) + (log-msg 'DEBUG "updating the database dump at " database-dump-filename) (let ((temp-database-dump-filename (string-append database-dump-filename ".tmp"))) @@ -41,23 +51,127 @@ (simple-format (current-error-port) "updated database dump\n"))) - (call-with-new-thread + (define recent-changes-count-metric + (metrics-registry-fetch-metric metrics-registry "recent_changes_count")) + + (spawn-fiber (lambda () (while #t - (let ((recent-changes-id-for-deletion - (database-get-recent-changes-id-for-deletion database - recent-changes-limit))) - (when recent-changes-id-for-deletion - (update-database-dump) - - (let ((deleted-recent-changes - (database-delete-recent-changes-with-id-below - database - recent-changes-id-for-deletion))) - (simple-format (current-error-port) - "deleted ~A recent changes\n" - deleted-recent-changes))) - - (sleep check-interval)))))) + (with-exception-handler + (lambda (exn) + (simple-format + (current-error-port) + "exception in recent change removal thread: ~A\n" + exn) + (sleep 120)) + (lambda () + (let ((recent-changes-id-for-deletion + (database-get-recent-changes-id-for-deletion database + recent-changes-limit))) + (when recent-changes-id-for-deletion + (when database-dump-filename + (update-database-dump)) + + (let ((deleted-recent-changes + (database-call-with-transaction + database + (lambda _ + (database-delete-recent-changes-with-id-below + database + recent-changes-id-for-deletion))))) + + (metric-decrement recent-changes-count-metric + #:by deleted-recent-changes) + + (simple-format (current-error-port) + "deleted ~A recent changes\n" + deleted-recent-changes))) + + (sleep check-interval))) + #:unwind? #t))))) + +(define (start-recent-change-listener-fiber database + metrics-registry + addition-channel + removal-channel) + (define recent-changes-count-metric + (metrics-registry-fetch-metric metrics-registry "recent_changes_count")) + + (define (process-addition-change change-details) + (let ((narinfo + (call-with-input-string + (assq-ref change-details 'data) + (lambda (port) + (read-narinfo port + "https://narherderdummyvalue"))))) + (for-each + (lambda (uri) + (log-msg 'DEBUG "processing recent addition of " (uri-path uri)) + (put-message addition-channel (list 'addition (uri-path uri)))) + (narinfo-uris narinfo)))) + + (define (process-removal-change change-details) + (log-msg 'DEBUG "processing recent change triggered removal of " + (assq-ref change-details 'data)) + (put-message removal-channel + (list 'remove (assq-ref change-details 'data)))) + + (spawn-fiber + (lambda () + (let ((recent-changes-count + (database-count-recent-changes database))) + (metric-set recent-changes-count-metric recent-changes-count) + (log-msg 'DEBUG recent-changes-count " recent changes in the database")) + + (log-msg 'DEBUG "starting to listen for recent changes") + (let ((after-initial + (database-select-latest-recent-change-datetime database))) + (let loop ((after after-initial) + (last-processed-recent-changes + (database-select-recent-changes database after-initial))) + (sleep 10) + + (match + (with-exception-handler + (lambda (exn) + (log-msg 'ERROR "exception in recent change listener " exn) + #f) + (lambda () + (let* ((recent-changes + (database-select-recent-changes database after)) + (unprocessed-recent-changes + (remove + (lambda (change-details) + (member change-details last-processed-recent-changes)) + recent-changes))) + + (unless (null? unprocessed-recent-changes) + (log-msg 'INFO "processing " (length unprocessed-recent-changes) + " recent changes") + + (metric-increment recent-changes-count-metric + #:by (length unprocessed-recent-changes)) + + (for-each + (lambda (change-details) + (let ((change (assq-ref change-details 'change))) + (cond + ((string=? change "addition") + (process-addition-change change-details)) + ((string=? change "removal") + (process-removal-change change-details)) + (else #f)))) + unprocessed-recent-changes)) + ;; Use the unprocessed recent changes here to carry + ;; forward all processed changes to the next pass + unprocessed-recent-changes)) + #:unwind? #t) + (#f (loop after '())) + (recent-changes + (if (null? recent-changes) + (loop after last-processed-recent-changes) + (loop (assq-ref (last recent-changes) + 'datetime) + recent-changes))))))))) diff --git a/nar-herder/server.scm b/nar-herder/server.scm index 522ff3f..a714fb3 100644 --- a/nar-herder/server.scm +++ b/nar-herder/server.scm @@ -19,39 +19,65 @@ (define-module (nar-herder server) #:use-module (srfi srfi-1) #:use-module (srfi srfi-11) + #:use-module (srfi srfi-19) #:use-module (srfi srfi-34) + #:use-module (srfi srfi-71) + #:use-module (ice-9 ftw) + #:use-module (ice-9 iconv) #:use-module (ice-9 match) + #:use-module (ice-9 threads) #:use-module (ice-9 binary-ports) #:use-module (rnrs bytevectors) #:use-module (web uri) + #:use-module (web client) #:use-module (web response) #:use-module (web request) #:use-module (logging logger) + #:use-module (fibers) + #:use-module (fibers channels) + #:use-module (fibers scheduler) + #:use-module (fibers conditions) + #:use-module (fibers operations) #:use-module (prometheus) #:use-module (json) #:use-module ((system foreign) #:select (bytevector->pointer pointer->bytevector)) #:use-module (guix store) #:use-module (guix base32) + #:use-module (guix progress) #:use-module (guix serialization) #:use-module ((guix utils) #:select (decompressed-port)) #:use-module ((guix build utils) #:select (dump-port)) + #:use-module ((guix build syscalls) #:select (set-thread-name)) #:use-module (nar-herder database) #:use-module (nar-herder storage) + #:use-module (nar-herder utils) + #:use-module (nar-herder mirror) + #:use-module (nar-herder recent-changes) + #:use-module (nar-herder cached-compression) #:use-module (ice-9 textual-ports) - #:export (make-request-handler)) + #:export (%compression-options + + run-nar-herder-service + make-request-handler)) + +(define %compression-options + '(gzip lzip zstd none)) (define* (render-json json #:key (extra-headers '()) (code 200)) (values (build-response #:code code #:headers (append extra-headers - '((content-type . (application/json)) + '((content-type . (application/json + (charset . "utf-8"))) (vary . (accept))))) - (lambda (port) - (scm->json json port)))) + (call-with-encoded-output-string + "utf-8" + (lambda (port) + (scm->json json port))))) (define (parse-query-string query) (let lp ((lst (map uri-decode (string-split query (char-set #\& #\=))))) @@ -61,32 +87,6 @@ (("") '()) (() '())))) -(define (get-gc-metrics-updater registry) - (define metrics - `((gc-time-taken - . ,(make-gauge-metric registry "guile_gc_time_taken")) - (heap-size - . ,(make-gauge-metric registry "guile_heap_size")) - (heap-free-size - . ,(make-gauge-metric registry "guile_heap_free_size")) - (heap-total-allocated - . ,(make-gauge-metric registry "guile_heap_total_allocated")) - (heap-allocated-since-gc - . ,(make-gauge-metric registry "guile_allocated_since_gc")) - (protected-objects - . ,(make-gauge-metric registry "guile_gc_protected_objects")) - (gc-times - . ,(make-gauge-metric registry "guile_gc_times")))) - - (lambda () - (let ((stats (gc-stats))) - (for-each - (match-lambda - ((name . metric) - (let ((value (assq-ref stats name))) - (metric-set metric value)))) - metrics)))) - (define (serve-fixed-output-file input compression proc) ;; TODO It's hard with fold-archive from (guix serialization) to ;; read just the singular file from the archive, so the following @@ -121,10 +121,11 @@ (define (read-string p) (utf8->string (read-byte-string p))) - (let*-values (((port pids) - (decompressed-port - (string->symbol compression) - input))) + (let ((port + pids + (decompressed-port + (string->symbol compression) + input))) ;; The decompressor can be an external program, so wait for it to ;; exit @@ -147,26 +148,78 @@ (proc port size))))) +(define (add-cached-compressions-to-narinfo initial-narinfo-contents + cached-narinfo-files) + (let ((cached-nar-strings + (map (lambda (cached-nar-details) + (let ((compression + (symbol->string + (assq-ref cached-nar-details 'compression)))) + (string-append + "URL: nar/" compression "/" + (uri-encode + (store-path-base + (assq-ref cached-nar-details 'store-path))) + "\n" + "Compression: " compression "\n" + "FileSize: " (number->string + (assq-ref cached-nar-details 'size)) + "\n"))) + cached-narinfo-files))) + (string-append + initial-narinfo-contents + (string-join + cached-nar-strings + "\n")))) + (define* (make-request-handler database storage-root - #:key ttl negative-ttl logger - metrics-registry) + #:key base-ttl base-cached-compressions-ttl + negative-ttl logger + metrics-registry + maybe-trigger-creation-of-cached-nars + cached-compression-nar-requested-hook) + (define hostname + (gethostname)) + (define (narinfo? str) (and (= (string-length str) 40) (string-suffix? ".narinfo" str))) + (define plain-metrics-registry + (make-metrics-registry)) + (define gc-metrics-updater - (get-gc-metrics-updater metrics-registry)) + (get-gc-metrics-updater plain-metrics-registry)) + + (define process-metrics-updater + (get-process-metrics-updater plain-metrics-registry)) + + (define guile-time-metrics-updater + (let ((internal-real-time + (make-gauge-metric plain-metrics-registry "guile_internal_real_time")) + (internal-run-time + (make-gauge-metric plain-metrics-registry "guile_internal_run_time"))) + (lambda () + (metric-set internal-real-time + (get-internal-real-time)) + (metric-set internal-run-time + (get-internal-run-time))))) (define requests-total-metric (make-counter-metric metrics-registry "server_requests_total")) - (define (increment-request-metric category response-code) + (define* (increment-request-metric category response-code #:key (labels '())) (metric-increment requests-total-metric #:label-values `((category . ,category) - (response_code . ,response-code)))) + (response_code . ,response-code) + ,@labels))) + + (define %compression-strings + (map symbol->string + %compression-options)) (lambda (request body) (log-msg logger @@ -178,30 +231,81 @@ (match (cons (request-method request) (split-and-decode-uri-path (uri-path (request-uri request)))) - (('GET (? narinfo? narinfo)) - (let ((narinfo-contents + (((or 'HEAD 'GET) (? narinfo? narinfo)) + (let ((base-narinfo-contents + narinfo-id (database-select-narinfo-contents-by-hash database (string-take narinfo 32)))) (increment-request-metric "narinfo" - (if narinfo-contents + (if base-narinfo-contents "200" "404")) - (if narinfo-contents - (values `((content-type . (text/plain)) - ,@(if ttl - `((cache-control (max-age . ,ttl))) - '())) - narinfo-contents) + (if base-narinfo-contents + (let* ((cached-narinfo-files + (database-select-cached-narinfo-files-by-narinfo-id + database + narinfo-id)) + (narinfo-contents + (if (null? cached-narinfo-files) + base-narinfo-contents + (add-cached-compressions-to-narinfo + base-narinfo-contents + cached-narinfo-files))) + (potential-ttls + (remove + not + `(,(if (null? cached-narinfo-files) + base-ttl + base-cached-compressions-ttl) + + ,(and=> (database-select-scheduled-narinfo-removal + database + narinfo-id) + (lambda (scheduled-removal-time) + (list + (max + (- (time-second + (time-difference scheduled-removal-time + (current-time))) + 60) + 0)))) + + ,@(if (null? cached-narinfo-files) + '() + (map + (lambda (details) + (and=> + (database-select-scheduled-cached-narinfo-removal + database + (assq-ref details 'id)) + (lambda (scheduled-removal-time) + (max + (- (time-second + (time-difference scheduled-removal-time + (current-time))) + 60) + 0)))) + cached-narinfo-files))))) + (ttl + (cond + ((null? potential-ttls) #f) + (else (apply min potential-ttls))))) + + (values `((content-type . (text/plain)) + ,@(if ttl + `((cache-control (max-age . ,ttl))) + '())) + narinfo-contents)) (values (build-response #:code 404 #:headers (if negative-ttl `((cache-control (max-age . ,negative-ttl))) '())) "404")))) - (('GET (? narinfo? narinfo) "info") + (((or 'HEAD 'GET) (? narinfo? narinfo) "info") (let ((narinfo-contents (database-select-narinfo-contents-by-hash database @@ -220,42 +324,133 @@ (string-take narinfo 32))))) (values (build-response #:code 404) "404")))) - (('GET "nar" compression filename) - (let* ((hash (string-take filename 32)) + (((or 'HEAD 'GET) "nar" compression filename) + (let* ((hash (and (>= (string-length filename) 32) + (string-take filename 32))) + (narinfo + (and hash + (database-select-narinfo-by-hash + database + hash))) (narinfo-files - (database-select-narinfo-files - database - hash)) + (and=> (assq-ref narinfo 'id) + (lambda (id) + (database-select-narinfo-files-by-narinfo-id + database + id)))) (narinfo-file-for-compression (find (lambda (file) - (string=? (assq-ref file 'compression) - compression)) - narinfo-files))) - - (when (or narinfo-file-for-compression - ;; Check for a common compression to avoid lots of - ;; metrics being generated if compression is random - (member compression '("gzip" "lzip" "zstd"))) - (increment-request-metric - (string-append "nar/" - compression) - (if narinfo-file-for-compression "200" "404"))) + (and (string=? (assq-ref file 'compression) + compression) + (string=? + (last (string-split (assq-ref file 'url) + #\/)) + (uri-encode filename)))) + (or narinfo-files '()))) + (compression-symbol + (if (member + compression + %compression-strings + string=?) + (string->symbol compression) + #f))) (if narinfo-file-for-compression - (values (build-response - #:code 200 - #:headers `((X-Accel-Redirect - . ,(string-append - "/internal/nar/" - compression "/" - (uri-encode filename))))) - #f) - (values (build-response #:code 404) - "404")))) - (('GET "file" name algo hash) + (let ((loop? + (any + (lambda (via) + (string=? (last (string-split via #\space)) + hostname)) + (request-via request)))) + + (when (and (not loop?) + maybe-trigger-creation-of-cached-nars) + (maybe-trigger-creation-of-cached-nars + (assq-ref narinfo 'id))) + + (when loop? + (log-msg logger 'WARN + (request-method request) + " " + (uri-path (request-uri request)) + ": loop detected (" hostname "): " + (string-join (request-via request) ", "))) + + (increment-request-metric + (string-append "nar/" + compression) + (if loop? + "500" + "200") + #:labels + (let ((system (assq-ref narinfo 'system))) + (if (string? system) + `((system . ,system)) + '()))) + + (if loop? + (values (build-response #:code 500) + (simple-format #f "loop detected (~A): ~A\n" + hostname + (request-via request))) + (values (build-response + #:code 200 + #:headers `((X-Accel-Redirect + . ,(string-append + "/internal/nar/" + compression "/" + (uri-encode filename))))) + #f))) + (let ((cached-narinfo-file + (and narinfo ; must be a known hash + compression-symbol ; must be a known compression + ;; Check that the filename given in the + ;; request matches the narinfo store-path + (string=? filename + (basename + (assq-ref narinfo 'store-path))) + (database-select-cached-narinfo-file-by-hash + database + hash + compression-symbol)))) + + (when (or cached-narinfo-file + ;; Check for a common compression to avoid lots of + ;; metrics being generated if compression is random + compression-symbol) + (increment-request-metric + (string-append "nar/" + compression) + (if cached-narinfo-file "200" "404") + #:labels + (if cached-narinfo-file + (let ((system (assq-ref narinfo 'system))) + (if (string? system) + `((system . ,system)) + '())) + '()))) + + (when cached-narinfo-file + (cached-compression-nar-requested-hook compression-symbol + filename)) + + (if cached-narinfo-file + (values (build-response + #:code 200 + #:headers `((X-Accel-Redirect + . ,(string-append + "/internal/cached-nar/" + compression "/" + (uri-encode filename))))) + #f) + (values (build-response #:code 404) + "404")))))) + (((or 'HEAD 'GET) "file" name algo hash) (guard (c ((invalid-base32-character? c) (values (build-response #:code 404) - "404"))) + (if (eq? (request-method request) 'HEAD) + #f + "404")))) (let ((hash-bytevector (nix-base32-string->bytevector hash))) (if (and (string=? algo "sha256") (= 32 (bytevector-length hash-bytevector))) @@ -272,43 +467,54 @@ store-path-hash)) (selected-narinfo-file ;; TODO Select intelligently - (first narinfo-files))) + (if (null? narinfo-files) + #f + (first narinfo-files))) + (filename + (and selected-narinfo-file + (let ((filename + (string-append + storage-root + (uri-decode + (assq-ref selected-narinfo-file 'url))))) + (and (file-exists? filename) + filename))))) (increment-request-metric "file" - (if selected-narinfo-file "200" "404")) - - (if selected-narinfo-file - (let* ((url - (assq-ref selected-narinfo-file 'url)) - (filename - (string-append storage-root - (uri-decode url)))) - - (serve-fixed-output-file - (open-input-file filename) - (assq-ref selected-narinfo-file - 'compression) - (lambda (nar-port bytes) - (values `((content-type . (application/octet-stream - (charset . "ISO-8859-1"))) - (content-length . ,bytes)) - (lambda (output-port) - (dump-port nar-port - output-port - bytes) - - (close-port output-port)))))) + (if filename "200" "404")) + + (if filename + (serve-fixed-output-file + (open-input-file filename) + (assq-ref selected-narinfo-file + 'compression) + (lambda (nar-port bytes) + (values `((content-type . (application/octet-stream + (charset . "ISO-8859-1"))) + (content-length . ,bytes)) + (if (eq? (request-method request) 'HEAD) + #f + (lambda (output-port) + (dump-port nar-port + output-port + bytes) + + (close-port nar-port)))))) (values (build-response #:code 404) - "404"))) + (if (eq? (request-method request) 'HEAD) + #f + "404")))) (begin (increment-request-metric "file" "404") (values (build-response #:code 404) - "404")))))) + (if (eq? (request-method request) 'HEAD) + #f + "404"))))))) - (('GET "recent-changes") + (((or 'HEAD 'GET) "recent-changes") (let ((query-parameters (or (and=> (uri-query (request-uri request)) parse-query-string) @@ -323,7 +529,7 @@ (or (assoc-ref query-parameters "since") "1970-01-01 00:00:01")))))))) - (('GET "latest-database-dump") + (((or 'HEAD 'GET) "latest-database-dump") (increment-request-metric "latest-database-dump" "200") @@ -331,20 +537,378 @@ #:code 200 #:headers '((X-Accel-Redirect . "/internal/database/nar_herder_dump.db"))) #f)) - (('GET "metrics") + (((or 'HEAD 'GET) "metrics") (gc-metrics-updater) + (process-metrics-updater) + (guile-time-metrics-updater) + (update-database-metrics! database) (increment-request-metric "metrics" "200") (values (build-response #:code 200 #:headers '((content-type . (text/plain)) (vary . (accept)))) - (lambda (port) - (write-metrics metrics-registry - port)))) + (call-with-output-string + (lambda (port) + (write-metrics metrics-registry port) + (write-metrics plain-metrics-registry port))))) (_ (increment-request-metric "unhandled" "404") (values (build-response #:code 404) "404"))))) +(define* (run-nar-herder-service opts lgr) + (define (download-database) + (let ((database-uri + (string->uri + (string-append (assq-ref opts 'mirror) + "/latest-database-dump")))) + (with-port-timeouts + (lambda () + (call-with-values + (lambda () + (simple-format (current-error-port) + "starting downloading the database\n") + (let ((port + socket + (open-socket-for-uri* database-uri))) + (http-get database-uri + #:port port + #:streaming? #t))) + (lambda (response body) + (when (not (= (response-code response) 200)) + (error "unable to fetch database from mirror")) + + (let* ((reporter (progress-reporter/file + (uri->string database-uri) + (response-content-length response) + (current-error-port))) + (port + (progress-report-port + reporter + body + #:download-size (response-content-length response)))) + + (call-with-output-file (assq-ref opts 'database) + (lambda (output-port) + (dump-port port output-port))) + + (close-port port)) + + (simple-format (current-error-port) + "finished downloading the database\n")))) + #:timeout 30))) + + (define metrics-registry + (make-metrics-registry + #:namespace + "narherder")) + + (and=> + (assq-ref opts 'mirror) + (lambda (mirror) + (let ((database-file (assq-ref opts 'database))) + (if (file-exists? database-file) + (begin + ;; TODO Open the database, and check if the + ;; latest changes in the database are visible on + ;; the source to mirror. If they're not, then + ;; delete the database and download it to get + ;; back in sync + + #f) + (download-database))))) + + ;; Used elsewhere + (make-gauge-metric metrics-registry "recent_changes_count") + + (let ((recent-changes-metric + (make-gauge-metric metrics-registry "recent_changes_limit"))) + (metric-set recent-changes-metric (assq-ref opts 'recent-changes-limit))) + + (define maintenance-scheduler + (make-scheduler #:parallelism 1)) + + (let* ((database (setup-database (assq-ref opts 'database) + metrics-registry + #:reader-threads + (assq-ref opts 'database-reader-threads))) + (canonical-storage (and=> (assq-ref opts 'storage) + canonicalize-path)) + + (enabled-cached-compressions + (let ((explicit-cached-compression-directories + (filter-map + (match-lambda + (('cached-compression-directory . details) details) + (_ #f)) + opts)) + (cached-compression-directories-max-sizes + (filter-map + (match-lambda + (('cached-compression-directory-max-size . details) details) + (_ #f)) + opts)) + (cached-compression-ttls + (filter-map + (match-lambda + (('cached-compression-ttl . details) details) + (_ #f)) + opts)) + (cached-compression-new-ttls + (filter-map + (match-lambda + (('cached-compression-new-ttl . details) details) + (_ #f)) + opts)) + (cached-compression-unused-removal-durations + (filter-map + (match-lambda + (('cached-compression-unused-removal-duration . details) + details) + (_ #f)) + opts))) + + (filter-map + (match-lambda + (('cached-compression . details) + (let ((compression + (assq-ref details 'type))) + (cons compression + `(,@(alist-delete 'type details) + (directory + . ,(or (assq-ref explicit-cached-compression-directories + compression) + (simple-format #f "/var/cache/nar-herder/nar/~A" + compression))) + (directory-max-size + . ,(assq-ref cached-compression-directories-max-sizes + compression)) + (ttl + . ,(assq-ref cached-compression-ttls + compression)) + (new-ttl + . ,(assq-ref cached-compression-new-ttls + compression)) + (unused-removal-duration + . ,(assq-ref cached-compression-unused-removal-durations + compression)))))) + (_ #f)) + opts))) + + (cached-compression-min-uses + (assq-ref opts 'cached-compression-min-uses)) + + (cached-compression-management-channel + (if (null? enabled-cached-compressions) + #f + (start-cached-compression-management-fiber + database + metrics-registry + (or (assq-ref opts 'cached-compression-nar-source) + canonical-storage) + enabled-cached-compressions + cached-compression-min-uses + #:cached-compression-workers + (assq-ref opts 'cached-compression-workers) + #:scheduler maintenance-scheduler))) + + (maybe-trigger-creation-of-cached-nars + (if (null? enabled-cached-compressions) + #f + (lambda (narinfo-id) + (spawn-fiber + (lambda () + (put-message cached-compression-management-channel + (cons 'narinfo-id narinfo-id))) + maintenance-scheduler)))) + + (cached-compression-nar-requested-hook + (if (null? enabled-cached-compressions) + #f + (lambda (compression filename) + (spawn-fiber + (lambda () + (let* ((directory + (assq-ref (assq-ref enabled-cached-compressions + compression) + 'directory))) + (utime (string-append directory "/" filename)))) + maintenance-scheduler))))) + + (if (string=? (assq-ref opts 'database-dump) + "disabled") + (log-msg 'INFO "database dump disabled") + (when (not (file-exists? (assq-ref opts 'database-dump))) + (log-msg 'INFO "dumping database...") + (dump-database database (assq-ref opts 'database-dump)))) + + (let ((finished? (make-condition))) + (call-with-new-thread + (lambda () + (catch 'system-error + (lambda () + (set-thread-name "maintenance")) + (const #t)) + + (run-fibers + (lambda () + (when canonical-storage + (initialise-storage-metrics + database + canonical-storage + metrics-registry)) + + (start-recent-change-removal-and-database-dump-fiber + database + metrics-registry + (let ((filename (assq-ref opts 'database-dump))) + (if (string=? filename "disabled") + #f + filename)) + (* 24 3600) ; 24 hours + (assq-ref opts 'recent-changes-limit)) + + (let ((mirror-channel + (and=> + (assq-ref opts 'mirror) + (lambda (mirror) + (start-fetch-changes-fiber + database + metrics-registry + canonical-storage ; might be #f, but that's fine here + mirror + cached-compression-management-channel) + + (if (assq-ref opts 'storage) + (start-mirroring-fiber database + mirror + (assq-ref opts 'storage-limit) + canonical-storage + metrics-registry) + #f)))) + (removal-channel + (let ((nar-removal-criteria + (filter-map + (match-lambda + ((key . val) + (if (eq? key 'storage-nar-removal-criteria) + val + #f))) + opts))) + (if (and (assq-ref opts 'storage) + (number? (assq-ref opts 'storage-limit)) + (not (null? nar-removal-criteria))) + (start-nar-removal-fiber database + canonical-storage + (assq-ref opts 'storage-limit) + metrics-registry + nar-removal-criteria) + #f))) + (addition-channel (make-channel))) + + (spawn-fiber + (lambda () + (while #t + (with-exception-handler + (lambda (exn) + (log-msg 'ERROR "exception processing addition-channel: " + exn)) + (lambda () + (match (get-message addition-channel) + (('addition file) + (when mirror-channel + (put-message mirror-channel + `(fetch ,file))) + (when removal-channel + (spawn-fiber + (lambda () + (sleep 60) + (removal-channel-remove-nar-from-storage removal-channel + file) + (sleep (* 5 60)) + (removal-channel-remove-nar-from-storage removal-channel + file) + (sleep (* 15 60)) + (removal-channel-remove-nar-from-storage removal-channel + file) + (sleep 3600) + (removal-channel-remove-nar-from-storage removal-channel + file))))))) + #:unwind? #t)))) + + (start-recent-change-listener-fiber + database + metrics-registry + addition-channel + removal-channel)) + + (unless (null? enabled-cached-compressions) + (let ((cached-compression-removal-fiber-wakeup-channel + (start-cached-compression-removal-fiber + database + cached-compression-management-channel + enabled-cached-compressions))) + (start-cached-compression-schedule-removal-fiber + database + metrics-registry + cached-compression-management-channel + enabled-cached-compressions + cached-compression-removal-fiber-wakeup-channel + (or (assq-ref opts 'narinfo-ttl) + ;; Default from (guix substitutes) + (* 36 3600))))) + + (log-msg 'DEBUG "finished maintenance setup") + (wait finished?)) + #:scheduler maintenance-scheduler + #:hz 0 + #:parallelism 1))) + + (call-with-sigint + (lambda () + (run-fibers + (lambda () + (let* ((current (current-scheduler)) + (schedulers + (cons current (scheduler-remote-peers current)))) + (for-each + (lambda (i sched) + (spawn-fiber + (lambda () + (catch 'system-error + (lambda () + (set-thread-name + (string-append "fibers " (number->string i)))) + (const #t))) + sched)) + (iota (length schedulers)) + schedulers)) + + (log-msg 'INFO "starting server, listening on " + (assq-ref opts 'host) ":" (assq-ref opts 'port)) + + (run-server/patched + (make-request-handler + database + canonical-storage + #:base-ttl (or (assq-ref opts 'new-narinfo-ttl) + (assq-ref opts 'narinfo-ttl)) + #:base-cached-compressions-ttl + (or (assq-ref opts 'new-cached-compressions-narinfo-ttl) + (assq-ref opts 'cached-compressions-narinfo-ttl)) + #:negative-ttl (assq-ref opts 'narinfo-negative-ttl) + #:logger lgr + #:metrics-registry metrics-registry + #:maybe-trigger-creation-of-cached-nars + maybe-trigger-creation-of-cached-nars + #:cached-compression-nar-requested-hook + cached-compression-nar-requested-hook) + #:host (assq-ref opts 'host) + #:port (assq-ref opts 'port)) + + (wait finished?)) + #:hz 0 + #:parallelism (assq-ref opts 'parallelism))) + finished?)))) diff --git a/nar-herder/storage.scm b/nar-herder/storage.scm index c017685..fc49b2d 100644 --- a/nar-herder/storage.scm +++ b/nar-herder/storage.scm @@ -18,6 +18,7 @@ (define-module (nar-herder storage) #:use-module (srfi srfi-1) + #:use-module (srfi srfi-71) #:use-module (ice-9 ftw) #:use-module (ice-9 match) #:use-module (ice-9 threads) @@ -25,21 +26,27 @@ #:use-module (web uri) #:use-module (web client) #:use-module (web response) + #:use-module (fibers) + #:use-module (fibers channels) #:use-module (logging logger) #:use-module (logging port-log) #:use-module (prometheus) #:use-module (json) #:use-module ((guix build utils) #:select (dump-port mkdir-p)) #:use-module ((guix store) #:select (store-path-hash-part)) + #:use-module (guix progress) #:use-module (nar-herder utils) #:use-module (nar-herder database) #:export (store-item-in-local-storage? remove-nar-files-by-hash - get-nar-files + initialise-storage-metrics + check-storage - start-nar-removal-thread - start-mirroring-thread)) + removal-channel-remove-nar-from-storage + + start-nar-removal-fiber + start-mirroring-fiber)) (define (store-item-in-local-storage? database storage-root hash) (let ((narinfo-files (database-select-narinfo-files database hash))) @@ -52,17 +59,12 @@ (assq-ref file 'url))))) narinfo-files))) -(define (remove-nar-files-by-hash database storage-root metrics-registry - hash) - (define nar-files-metric - (or (metrics-registry-fetch-metric metrics-registry - "nar_files_total") - (make-gauge-metric metrics-registry - "nar_files_total" - #:labels '(stored)))) - +(define* (remove-nar-files-by-hash database storage-root metrics-registry + hash + #:key (error-unless-files-to-remove? #t)) (let ((narinfo-files (database-select-narinfo-files database hash))) - (when (null? narinfo-files) + (when (and (null? narinfo-files) + error-unless-files-to-remove?) (error "no narinfo files")) (for-each (lambda (file) @@ -76,9 +78,14 @@ (remove-nar-from-storage storage-root (assq-ref file 'url))) - (metric-decrement nar-files-metric - #:label-values - `((stored . ,(if exists? "true" "false")))))) + (and=> (metrics-registry-fetch-metric metrics-registry + "nar_files_total") + (lambda (metric) + ;; Just update this metric if it exists, since if it + ;; does, it should be set to a value + (metric-decrement + metric + #:label-values `((stored . ,(if exists? "true" "false")))))))) narinfo-files))) (define (get-storage-size storage-root) @@ -159,50 +166,223 @@ (unrecognised-files . ,(hash-map->list (lambda (key _) key) files-hash))))) -(define* (get-nar-files database storage-root metrics-registry - #:key stored?) - (define nar-files-metric - (or (metrics-registry-fetch-metric metrics-registry - "nar_files_total") - (make-gauge-metric metrics-registry - "nar_files_total" - #:labels '(stored)))) - - (let* ((index (index-storage database storage-root)) - (selected-files - (filter - (lambda (file) - (eq? (assq-ref file 'stored?) stored?)) - (assq-ref index 'narinfo-files)))) - - (let ((selected-files-count - (length selected-files)) - (all-files-count - (length (assq-ref index 'narinfo-files)))) - - (metric-set nar-files-metric - selected-files-count - #:label-values `((stored . ,(if stored? "true" "false")))) - (metric-set nar-files-metric - (- all-files-count selected-files-count) - #:label-values `((stored . ,(if stored? "false" "true"))))) - - selected-files)) - -(define (start-nar-removal-thread database - storage-root storage-limit - metrics-registry - nar-removal-criteria) +;; TODO Maybe remove the metrics-registry argument? +(define* (fold-nar-files database storage-root metrics-registry + proc init + #:key stored?) + (define stored-files-count 0) + (define not-stored-files-count 0) + + (let ((result + (database-fold-all-narinfo-files + database + (lambda (nar result) + (let* ((url + (uri-decode + (assq-ref nar 'url))) + (nar-stored? + (file-exists? + (string-append storage-root url)))) + + (if nar-stored? + (set! stored-files-count (1+ stored-files-count)) + (set! not-stored-files-count (1+ not-stored-files-count))) + + (if (or (eq? stored? 'both) + (and stored? nar-stored?) + (and (not stored?) + (not nar-stored?))) + (proc nar result) + result))) + init))) + + (values result + `((stored . ,stored-files-count) + (not-stored . ,not-stored-files-count))))) + +(define* (update-nar-files-metric metrics-registry + nar-file-counts + #:key fetched-count removed-count) + + ;; Avoid incrementing or decrementing the metric if it hasn't been + ;; set yet + (when (or (metrics-registry-fetch-metric metrics-registry + "nar_files_total") + (= (length nar-file-counts) 2)) + + (let ((nar-files-metric + (or (metrics-registry-fetch-metric metrics-registry + "nar_files_total") + (make-gauge-metric metrics-registry + "nar_files_total" + #:labels '(stored))))) + + ;; Set the values if the counts are known + (and=> + (assq-ref nar-file-counts 'stored) + (lambda (stored-count) + (metric-set nar-files-metric + stored-count + #:label-values '((stored . "true"))))) + (and=> + (assq-ref nar-file-counts 'not-stored) + (lambda (not-stored-count) + (metric-set nar-files-metric + not-stored-count + #:label-values '((stored . "false"))))) + + ;; Then adjust by the fetched or removed counts + (when fetched-count + (metric-increment nar-files-metric + #:by fetched-count + #:label-values '((stored . "true"))) + (metric-decrement nar-files-metric + #:by fetched-count + #:label-values '((stored . "false")))) + (when removed-count + (metric-decrement nar-files-metric + #:by removed-count + #:label-values '((stored . "true"))) + (metric-increment nar-files-metric + #:by removed-count + #:label-values '((stored . "false"))))))) + +(define (initialise-storage-metrics database storage-root metrics-registry) + ;; Use a database transaction to block changes + (database-call-with-transaction + database + (lambda _ + (log-msg 'INFO "starting to initialise storage metrics") + (let ((_ + counts + (fold-nar-files + database + storage-root + metrics-registry + (const #f) + #f + #:stored? 'both))) + (update-nar-files-metric + metrics-registry + counts)) + (log-msg 'INFO "finished initialising storage metrics")))) + +(define (check-storage database storage-root metrics-registry) + (define files-count + (database-count-narinfo-files database)) + + (call-with-progress-reporter + (progress-reporter/bar files-count + (simple-format #f "checking ~A files" files-count) + (current-error-port)) + (lambda (report) + (fold-nar-files + database + storage-root + metrics-registry + (lambda (file _) + (let* ((full-filename + (string-append storage-root + (uri-decode (assq-ref file 'url)))) + (file-size + (stat:size (stat full-filename))) + (database-size + (assq-ref file 'size))) + (report) + (unless (= file-size database-size) + (newline) + (log-msg 'WARN "file " full-filename + " has inconsistent size (database: " + database-size ", file: " file-size ")")) + #f)) + #f + #:stored? 'both)))) + +(define (at-most max-length lst) + "If LST is shorter than MAX-LENGTH, return it and the empty list; otherwise +return its MAX-LENGTH first elements and its tail." + (let loop ((len 0) + (lst lst) + (result '())) + (match lst + (() + (values (reverse result) '())) + ((head . tail) + (if (>= len max-length) + (values (reverse result) lst) + (loop (+ 1 len) tail (cons head result))))))) + +(define %max-cached-connections + ;; Maximum number of connections kept in cache by + ;; 'open-connection-for-uri/cached'. + 16) + +(define open-socket-for-uri/cached + (let ((cache '())) + (lambda* (uri #:key fresh? verify-certificate?) + "Return a connection for URI, possibly reusing a cached connection. +When FRESH? is true, delete any cached connections for URI and open a new one. +Return #f if URI's scheme is 'file' or #f. + +When VERIFY-CERTIFICATE? is true, verify HTTPS server certificates." + (define host (uri-host uri)) + (define scheme (uri-scheme uri)) + (define key (list host scheme (uri-port uri))) + + (and (not (memq scheme '(file #f))) + (match (assoc-ref cache key) + (#f + ;; Open a new connection to URI and evict old entries from + ;; CACHE, if any. + (let ((socket + (open-socket-for-uri* + uri + #:verify-certificate? verify-certificate?)) + (new-cache evicted + (at-most (- %max-cached-connections 1) cache))) + (for-each (match-lambda + ((_ . port) + (false-if-exception (close-port port)))) + evicted) + (set! cache (alist-cons key socket new-cache)) + socket)) + (socket + (if (or fresh? (port-closed? socket)) + (begin + (false-if-exception (close-port socket)) + (set! cache (alist-delete key cache)) + (open-socket-for-uri/cached uri + #:verify-certificate? + verify-certificate?)) + (begin + ;; Drain input left from the previous use. + (drain-input socket) + socket)))))))) + +(define (call-with-cached-connection uri proc) + (let ((port (open-socket-for-uri/cached uri))) + (with-throw-handler #t + (lambda () + (proc port)) + (lambda _ + (close-port port))))) + +(define (removal-channel-remove-nar-from-storage + channel file) + (let ((reply (make-channel))) + (put-message channel (list 'remove-from-storage reply file)) + (get-message reply))) + +(define (start-nar-removal-fiber database + storage-root storage-limit + metrics-registry + nar-removal-criteria) (define storage-size-metric (make-gauge-metric metrics-registry "storage_size_bytes")) - (define nar-files-metric - (or (metrics-registry-fetch-metric metrics-registry - "nar_files_total") - (make-gauge-metric metrics-registry - "nar_files_total" - #:labels '(stored)))) + (define removal-channel + (make-channel)) (define (check-removal-criteria nar criteria) (define narinfo @@ -222,21 +402,29 @@ (store-path-hash-part (assq-ref narinfo 'store-path)) ".narinfo/info")))) - (call-with-values - (lambda () - (retry-on-error + (with-port-timeouts + (lambda () + (call-with-values (lambda () - (http-get uri #:decode-body? #f)) - #:times 3 - #:delay 5)) - (lambda (response body) - (and (= (response-code response) - 200) - - (let ((json-body (json-string->scm - (utf8->string body)))) - (eq? (assoc-ref json-body "stored") - #t))))))))) + (retry-on-error + (lambda () + (call-with-cached-connection uri + (lambda (port) + (http-get uri + #:port port + #:decode-body? #f + #:keep-alive? #t + #:streaming? #t)))) + #:times 3 + #:delay 5)) + (lambda (response body) + (and (= (response-code response) + 200) + + (let ((json-body (json->scm body))) + (eq? (assoc-ref json-body "stored") + #t)))))) + #:timeout 30))))) (define (nar-can-be-removed? nar) (any (lambda (criteria) @@ -252,43 +440,98 @@ (metric-set storage-size-metric initial-storage-size) - (let loop ((storage-size - initial-storage-size) - (stored-nar-files - (with-time-logging "getting stored nar files" - (get-nar-files database storage-root metrics-registry - #:stored? #t)))) - ;; Look through items in local storage, check if the removal - ;; criteria have been met, and if so, delete it - - (unless (null? stored-nar-files) - (let ((nar-to-consider (car stored-nar-files))) - (if (nar-can-be-removed? nar-to-consider) - (begin - (remove-nar-from-storage - storage-root - (uri-decode - (assq-ref nar-to-consider 'url))) - - (metric-decrement nar-files-metric - #:label-values '((stored . "true"))) - (metric-increment nar-files-metric - #:label-values '((stored . "false"))) - - (let ((storage-size-estimate - (- storage-size - (assq-ref nar-to-consider 'size)))) - (when (> storage-size-estimate storage-limit) - (loop storage-size-estimate - (cdr stored-nar-files))))) - (loop storage-size - (cdr stored-nar-files))))))) - (log-msg 'INFO "finished looking for nars to remove")) + ;; Look through items in local storage, check if the removal + ;; criteria have been met, and if so, delete it + (let ((result + nar-file-counts + (fold-nar-files + database + storage-root + metrics-registry + (lambda (nar result) + (match result + ((storage-size . removed-count) + (if (and (> storage-size storage-limit) + (nar-can-be-removed? nar)) + (let ((response + (removal-channel-remove-nar-from-storage + removal-channel + (assq-ref nar 'url)))) + + (if (eq? response 'removed) + (let ((storage-size-estimate + (- storage-size + (assq-ref nar 'size)))) + (cons storage-size-estimate + (+ removed-count 1))) + (cons storage-size + removed-count))) + (cons storage-size + removed-count))))) + (cons initial-storage-size 0) + #:stored? #t))) + + (match result + ((storage-size . removed-count) + + (log-msg 'INFO "finished looking for nars to remove, removed " + removed-count " files")))))) (when (null? nar-removal-criteria) (error "must be some removal criteria")) - (call-with-new-thread + (spawn-fiber + (lambda () + (while #t + (match (get-message removal-channel) + (('remove-from-storage reply file) + (with-exception-handler + (lambda (exn) + (log-msg 'ERROR "nar remove from storage failed (" + file "): " exn) + (put-message reply + (cons 'exn exn))) + (lambda () + (with-throw-handler #t + (lambda () + (cond + ((not (file-exists? + (string-append storage-root + (uri-decode file)))) + (put-message reply 'does-not-exist)) + ((not (nar-can-be-removed? + `((url . ,file)))) + (put-message reply + 'removal-criteria-not-met)) + (else + (remove-nar-from-storage + storage-root + (uri-decode file)) + + (update-nar-files-metric + metrics-registry + '() + #:removed-count 1) + + (put-message reply 'removed)))) + (lambda _ + (backtrace)))) + #:unwind? #t)) + (('remove file) + (with-exception-handler + (lambda (exn) + (log-msg 'ERROR "failed to remove " file ": " exn)) + (lambda () + ;; TODO: Do more checking at this point + (remove-nar-from-storage + storage-root + (uri-decode file)) + (update-nar-files-metric metrics-registry + '() + #:removed-count 1)) + #:unwind? #t)))))) + + (spawn-fiber (lambda () (while #t (with-exception-handler @@ -296,11 +539,12 @@ (log-msg 'ERROR "nar removal pass failed " exn)) run-removal-pass #:unwind? #t) + (sleep (* 60 60 24))))) - (sleep 300))))) + removal-channel) -(define (start-mirroring-thread database mirror storage-limit storage-root - metrics-registry) +(define (start-mirroring-fiber database mirror storage-limit storage-root + metrics-registry) (define no-storage-limit? (not (integer? storage-limit))) @@ -309,13 +553,6 @@ (make-gauge-metric metrics-registry "storage_size_bytes")) - (define nar-files-metric - (or (metrics-registry-fetch-metric metrics-registry - "nar_files_total") - (make-gauge-metric metrics-registry - "nar_files_total" - #:labels '(stored)))) - (define (fetch-file file) (let* ((string-url (string-append mirror file)) @@ -333,83 +570,149 @@ (when (file-exists? tmp-file-name) (delete-file tmp-file-name)) - (call-with-values - (lambda () - (http-get uri - #:decode-body? #f - #:streaming? #t)) - (lambda (response body) - (unless (= (response-code response) - 200) - (error "unknown response code")) - - (call-with-output-file tmp-file-name - (lambda (output-port) - (dump-port body output-port))) - (rename-file tmp-file-name - destination-file-name) - - (metric-increment nar-files-metric - #:label-values '((stored . "true"))) - (metric-decrement nar-files-metric - #:label-values '((stored . "false"))))))) + (with-exception-handler + (lambda (exn) + (when (file-exists? tmp-file-name) + (delete-file tmp-file-name)) + + (raise-exception exn)) + (lambda () + (with-port-timeouts + (lambda () + (call-with-values + (lambda () + (let ((port + socket + (open-socket-for-uri* uri))) + (http-get uri + #:port port + #:decode-body? #f + #:streaming? #t))) + (lambda (response body) + (unless (= (response-code response) + 200) + (error "unknown response code" + (response-code response))) + + (call-with-output-file tmp-file-name + (lambda (output-port) + (dump-port body output-port)))))) + #:timeout 30)) + #:unwind? #t) + + (rename-file tmp-file-name + destination-file-name) + + (update-nar-files-metric + metrics-registry + '() + #:fetched-count 1))) (define (download-nars initial-storage-size) ;; If there's free space, then consider downloading missing nars - (when (< initial-storage-size storage-limit) - (let loop ((storage-size initial-storage-size) - (missing-nar-files (get-nar-files - database storage-root metrics-registry - #:stored? #f))) - (unless (null? missing-nar-files) - (let ((file (car missing-nar-files))) - (log-msg 'DEBUG "considering " - (assq-ref file 'url)) - (let ((file-bytes (assq-ref file 'size))) - (if (or no-storage-limit? - (< (+ storage-size file-bytes) - storage-limit)) - (let ((success? - (with-exception-handler - (lambda (exn) - (log-msg 'ERROR "failed to fetch " - (assq-ref file 'url) - ": " exn) - #f) - (lambda () - (fetch-file (assq-ref file 'url)) - #t) - #:unwind? #t))) - (loop (if success? - (+ storage-size file-bytes) - storage-size) - (cdr missing-nar-files))) - ;; This file won't fit, so try the next one - (loop storage-size - (cdr missing-nar-files))))))))) + (if (< initial-storage-size storage-limit) + (let ((result + nar-file-counts + (fold-nar-files + database + storage-root + metrics-registry + (lambda (file result) + (log-msg 'DEBUG "considering " + (assq-ref file 'url)) + (match result + ((storage-size . fetched-count) + (let ((file-bytes (assq-ref file 'size))) + (if (or no-storage-limit? + (< (+ storage-size file-bytes) + storage-limit)) + (let ((success? + (with-exception-handler + (lambda (exn) + (log-msg 'ERROR "failed to fetch " + (assq-ref file 'url) + ": " exn) + #f) + (lambda () + (with-throw-handler #t + (lambda () + (retry-on-error + (lambda () + (fetch-file (assq-ref file 'url))) + #:times 3 + #:delay 5)) + (lambda _ + (backtrace))) + #t) + #:unwind? #t))) + (if success? + (cons (+ storage-size file-bytes) + (1+ fetched-count)) + result)) + ;; This file won't fit, so try the next one + result))))) + initial-storage-size + #:stored? #f))) + + (match result + ((storage-size . fetched-count) + fetched-count))) + 0)) (define (fast-download-nars) (define parallelism 3) - (let ((missing-nar-files (get-nar-files - database storage-root metrics-registry - #:stored? #f))) - (n-par-for-each - parallelism - (lambda (file) - (log-msg 'DEBUG "considering " - (assq-ref file 'url)) - (with-exception-handler - (lambda (exn) - (log-msg 'ERROR "failed to fetch " - (assq-ref file 'url) - ": " exn) - #f) - (lambda () - (fetch-file (assq-ref file 'url)) - #t) - #:unwind? #t)) - missing-nar-files))) + (let ((channel (make-channel))) + (for-each + (lambda _ + (spawn-fiber + (lambda () + (let loop ((fetched-count 0)) + (match (get-message channel) + (('finished . reply) + (put-message reply fetched-count)) + (url + (log-msg 'DEBUG "considering " url) + (loop + (+ fetched-count + (with-exception-handler + (lambda (exn) + (log-msg 'ERROR "failed to fetch " url ": " exn) + 0) + (lambda () + (retry-on-error + (lambda () + (fetch-file url)) + #:times 3 + #:delay 5) + 1) + #:unwind? #t))))))))) + (iota parallelism)) + + (let ((result + nar-file-counts + (fold-nar-files + database + storage-root + metrics-registry + (lambda (nar _) + (put-message channel + (assq-ref nar 'url)) + #f) + #f + #:stored? #f))) + + (let* ((reply-channel (make-channel)) + (fetched-count + (apply + + + (map + (lambda _ + (put-message channel + (cons 'finished reply-channel)) + (get-message reply-channel)) + (iota parallelism))))) + fetched-count)))) (define (run-mirror-pass) (log-msg 'DEBUG "running mirror pass") @@ -417,18 +720,38 @@ (get-storage-size storage-root)))) (metric-set storage-size-metric initial-storage-size) - (if no-storage-limit? - (fast-download-nars) - (download-nars initial-storage-size))) - (log-msg 'DEBUG "finished mirror pass")) - - (call-with-new-thread - (lambda () - (while #t - (with-exception-handler - (lambda (exn) - (log-msg 'ERROR "mirror pass failed " exn)) - run-mirror-pass - #:unwind? #t) - - (sleep 300))))) + (let ((fetched-count + (if no-storage-limit? + (fast-download-nars) + (download-nars initial-storage-size)))) + (log-msg 'DEBUG "finished mirror pass (fetched " fetched-count " nars)")))) + + (let ((channel (make-channel))) + (spawn-fiber + (lambda () + (while #t + (match (get-message channel) + ('full-pass + (with-exception-handler + (lambda (exn) + (log-msg 'ERROR "mirror pass failed " exn)) + run-mirror-pass + #:unwind? #t)) + (('fetch file) + (with-exception-handler + (lambda (exn) + (log-msg 'ERROR "failed to mirror " file ": " exn)) + (lambda () + (fetch-file file) + (update-nar-files-metric metrics-registry + '() + #:fetched-count 1)) + #:unwind? #t)))))) + + (spawn-fiber + (lambda () + (while #t + (put-message channel 'full-pass) + (sleep (* 60 60 24))))) + + channel)) diff --git a/nar-herder/utils.scm b/nar-herder/utils.scm index 2d62360..4755d33 100644 --- a/nar-herder/utils.scm +++ b/nar-herder/utils.scm @@ -18,44 +18,37 @@ (define-module (nar-herder utils) #:use-module (srfi srfi-1) + #:use-module (srfi srfi-9) #:use-module (srfi srfi-19) ; time #:use-module (ice-9 q) - ;; #:use-module (ice-9 ftw) - ;; #:use-module (ice-9 popen) #:use-module (ice-9 iconv) #:use-module (ice-9 match) #:use-module (ice-9 format) #:use-module (ice-9 threads) + #:use-module (ice-9 atomic) #:use-module (ice-9 textual-ports) #:use-module (ice-9 rdelim) #:use-module (ice-9 binary-ports) #:use-module (ice-9 exceptions) #:use-module (rnrs bytevectors) + #:use-module (ice-9 suspendable-ports) + #:use-module ((ice-9 ports internal) #:select (port-poll + port-read-wait-fd + port-write-wait-fd)) #:use-module (web uri) #:use-module (web http) #:use-module (web client) #:use-module (web request) #:use-module (web response) #:use-module (fibers) + #:use-module (fibers timers) #:use-module (fibers channels) + #:use-module (fibers scheduler) #:use-module (fibers conditions) - ;; #:use-module (gcrypt pk-crypto) - ;; #:use-module (gcrypt hash) - ;; #:use-module (gcrypt random) - ;; #:use-module (json) - ;; #:use-module (guix pki) - ;; #:use-module (guix utils) - ;; #:use-module (guix config) - ;; #:use-module (guix store) - ;; #:use-module (guix status) - ;; #:use-module (guix base64) - ;; #:use-module (guix scripts substitute) - #:export (call-with-streaming-http-request - &chunked-input-ended-prematurely - chunked-input-ended-prematurely-error? - make-chunked-input-port* - - make-worker-thread-channel + #:use-module (fibers operations) + #:use-module ((guix build syscalls) + #:select (set-thread-name)) + #:export (make-worker-thread-set call-with-worker-thread call-with-time-logging @@ -65,171 +58,20 @@ create-work-queue - check-locale!)) - -;; Chunked Responses -(define (read-chunk-header port) - "Read a chunk header from PORT and return the size in bytes of the -upcoming chunk." - (match (read-line port) - ((? eof-object?) - ;; Connection closed prematurely: there's nothing left to read. - (error "chunked input ended prematurely")) - (str - (let ((extension-start (string-index str - (lambda (c) - (or (char=? c #\;) - (char=? c #\return)))))) - (string->number (if extension-start ; unnecessary? - (substring str 0 extension-start) - str) - 16))))) - -(define &chunked-input-ended-prematurely - (make-exception-type '&chunked-input-error-prematurely - &external-error - '())) + check-locale! -(define make-chunked-input-ended-prematurely-error - (record-constructor &chunked-input-ended-prematurely)) - -(define chunked-input-ended-prematurely-error? - (record-predicate &chunked-input-ended-prematurely)) - -(define* (make-chunked-input-port* port #:key (keep-alive? #f)) - (define (close) - (unless keep-alive? - (close-port port))) - - (define chunk-size 0) ;size of the current chunk - (define remaining 0) ;number of bytes left from the current chunk - (define finished? #f) ;did we get all the chunks? - - (define (read! bv idx to-read) - (define (loop to-read num-read) - (cond ((or finished? (zero? to-read)) - num-read) - ((zero? remaining) ;get a new chunk - (let ((size (read-chunk-header port))) - (set! chunk-size size) - (set! remaining size) - (cond - ((zero? size) - (set! finished? #t) - (get-bytevector-n port 2) ; \r\n follows the last chunk - num-read) - (else - (loop to-read num-read))))) - (else ;read from the current chunk - (let* ((ask-for (min to-read remaining)) - (read (get-bytevector-n! port bv (+ idx num-read) - ask-for))) - (cond - ((eof-object? read) ;premature termination - (raise-exception - (make-chunked-input-ended-prematurely-error))) - (else - (let ((left (- remaining read))) - (set! remaining left) - (when (zero? left) - ;; We're done with this chunk; read CR and LF. - (get-u8 port) (get-u8 port)) - (loop (- to-read read) - (+ num-read read))))))))) - (loop to-read 0)) - - (make-custom-binary-input-port "chunked input port" read! #f #f close)) - -(define* (make-chunked-output-port* port #:key (keep-alive? #f) - (buffering 1200) - report-bytes-sent) - (define heap-allocated-limit - (expt 2 20)) ;; 1MiB - - (define (%put-string s) - (unless (string-null? s) - (let* ((bv (string->bytevector s "ISO-8859-1")) - (length (bytevector-length bv))) - (put-string port (number->string length 16)) - (put-string port "\r\n") - (put-bytevector port bv) - (put-string port "\r\n") - - (when report-bytes-sent - (report-bytes-sent length)) - (let* ((stats (gc-stats)) - (initial-gc-times - (assq-ref stats 'gc-times))) - (when (> (assq-ref stats 'heap-allocated-since-gc) - heap-allocated-limit) - (while (let ((updated-stats (gc-stats))) - (= (assq-ref updated-stats 'gc-times) - initial-gc-times)) - (gc) - (usleep 50))))))) - - (define (%put-char c) - (%put-string (list->string (list c)))) - - (define (flush) #t) - (define (close) - (put-string port "0\r\n\r\n") - (force-output port) - (unless keep-alive? - (close-port port))) - (let ((ret (make-soft-port - (vector %put-char %put-string flush #f close) "w"))) - (setvbuf ret 'block buffering) - ret)) - -(define* (call-with-streaming-http-request uri callback - #:key (headers '()) - (method 'PUT) - report-bytes-sent) - (let* ((port (open-socket-for-uri uri)) - (request - (build-request - uri - #:method method - #:version '(1 . 1) - #:headers `((connection close) - (Transfer-Encoding . "chunked") - (Content-Type . "application/octet-stream") - ,@headers) - #:port port))) - - (set-port-encoding! port "ISO-8859-1") - (setvbuf port 'block (expt 2 13)) - (with-exception-handler - (lambda (exp) - (simple-format #t "error: PUT ~A: ~A\n" (uri-path uri) exp) - (close-port port) - (raise-exception exp)) - (lambda () - (let ((request (write-request request port))) - (let* ((chunked-output-port - (make-chunked-output-port* - port - #:buffering (expt 2 12) - #:keep-alive? #t - #:report-bytes-sent report-bytes-sent))) - - ;; A SIGPIPE will kill Guile, so ignore it - (sigaction SIGPIPE - (lambda (arg) - (simple-format (current-error-port) "warning: SIGPIPE\n"))) - - (set-port-encoding! chunked-output-port "ISO-8859-1") - (callback chunked-output-port) - (close-port chunked-output-port) - - (let ((response (read-response port))) - (let ((body (read-response-body response))) - (close-port port) - (values response - body))))))))) - -(define* (retry-on-error f #:key times delay ignore) + open-socket-for-uri* + + call-with-sigint + run-server/patched + + timeout-error? + + port-read-timeout-error? + port-write-timeout-error? + with-port-timeouts)) + +(define* (retry-on-error f #:key times delay ignore error-hook) (let loop ((attempt 1)) (match (with-exception-handler (lambda (exn) @@ -259,15 +101,26 @@ upcoming chunk." times)) (apply values return-values)) ((#f . exn) - (if (>= attempt times) + (if (>= attempt + (- times 1)) (begin (simple-format (current-error-port) - "error: ~A:\n ~A,\n giving up after ~A attempts\n" + "error: ~A:\n ~A,\n attempt ~A of ~A, last retry in ~A\n" f exn - times) - (raise-exception exn)) + attempt + times + delay) + (when error-hook + (error-hook attempt exn)) + (sleep delay) + (simple-format + (current-error-port) + "running last retry of ~A after ~A failed attempts\n" + f + attempt) + (f)) (begin (simple-format (current-error-port) @@ -277,71 +130,11 @@ upcoming chunk." attempt times delay) + (when error-hook + (error-hook attempt exn)) (sleep delay) (loop (+ 1 attempt)))))))) -(define delay-logging-fluid - (make-thread-local-fluid)) -(define delay-logging-depth-fluid - (make-thread-local-fluid 0)) - -(define (log-delay proc duration) - (and=> (fluid-ref delay-logging-fluid) - (lambda (recorder) - (recorder proc duration)))) - -(define* (call-with-delay-logging proc #:key (threshold 1) (args '())) - (let ((start (get-internal-real-time)) - (trace '()) - (root-logger? (eq? #f (fluid-ref delay-logging-fluid)))) - - (define (format-seconds seconds) - (format #f "~4f" seconds)) - - (call-with-values - (lambda () - (with-fluid* delay-logging-depth-fluid - (+ 1 (fluid-ref delay-logging-depth-fluid)) - (lambda () - (if root-logger? - (with-fluid* delay-logging-fluid - (lambda (proc duration) - (set! trace - (cons (list proc - duration - (fluid-ref delay-logging-depth-fluid)) - trace)) - #t) - (lambda () - (apply proc args))) - (apply proc args))))) - (lambda vals - (let ((elapsed-seconds - (/ (- (get-internal-real-time) - start) - internal-time-units-per-second))) - (if (and (> elapsed-seconds threshold) - root-logger?) - (let ((lines - (cons - (simple-format #f "warning: delay of ~A seconds: ~A" - (format-seconds elapsed-seconds) - proc) - (map (match-lambda - ((proc duration depth) - (string-append - (make-string (* 2 depth) #\space) - (simple-format #f "~A: ~A" - (format-seconds duration) - proc)))) - trace)))) - (display (string-append - (string-join lines "\n") - "\n"))) - (unless root-logger? - ((fluid-ref delay-logging-fluid) proc elapsed-seconds)))) - (apply values vals))))) - (define (call-with-time-logging name thunk) (let ((start (current-time time-utc))) (call-with-values @@ -364,7 +157,9 @@ upcoming chunk." (define* (create-work-queue thread-count-parameter proc #:key thread-start-delay (thread-stop-delay - (make-time time-duration 0 0))) + (make-time time-duration 0 0)) + (name "unnamed") + priority<?) (let ((queue (make-q)) (queue-mutex (make-mutex)) (job-available (make-condition-variable)) @@ -384,11 +179,26 @@ upcoming chunk." (else thread-count-parameter))) - (define (process-job . args) - (with-mutex queue-mutex - (enq! queue args) - (start-new-threads-if-necessary (get-thread-count)) - (signal-condition-variable job-available))) + (define process-job + (if priority<? + (lambda* (args #:key priority) + (with-mutex queue-mutex + (enq! queue (cons priority args)) + (set-car! + queue + (stable-sort! (car queue) + (lambda (a b) + (priority<? + (car a) + (car b))))) + (sync-q! queue) + (start-new-threads-if-necessary (get-thread-count)) + (signal-condition-variable job-available))) + (lambda args + (with-mutex queue-mutex + (enq! queue args) + (start-new-threads-if-necessary (get-thread-count)) + (signal-condition-variable job-available))))) (define (count-threads) (with-mutex queue-mutex @@ -403,11 +213,12 @@ upcoming chunk." (define (list-jobs) (with-mutex queue-mutex - (append (list-copy - (car queue)) + (append (if priority<? + (map cdr (car queue)) + (list-copy (car queue))) (hash-fold (lambda (key val result) - (or (and val - (cons val result)) + (if val + (cons val result) result)) '() running-job-args)))) @@ -416,16 +227,17 @@ upcoming chunk." (with-exception-handler (lambda (exn) (simple-format (current-error-port) - "job raised exception: ~A\n" - job-args)) + "~A work queue, job raised exception ~A: ~A\n" + name job-args exn)) (lambda () (with-throw-handler #t (lambda () (apply proc job-args)) (lambda (key . args) - (simple-format (current-error-port) - "exception when handling job: ~A ~A\n" - key args) + (simple-format + (current-error-port) + "~A work queue, exception when handling job: ~A ~A\n" + name key args) (backtrace)))) #:unwind? #t)) @@ -453,6 +265,13 @@ upcoming chunk." (call-with-new-thread (lambda () + (catch 'system-error + (lambda () + (set-thread-name + (string-append name " q t " + (number->string thread-index)))) + (const #t)) + (let loop ((last-job-finished-at (current-time time-monotonic))) (lock-mutex queue-mutex) @@ -469,9 +288,13 @@ upcoming chunk." ;; the job in the mean time (if (q-empty? queue) #f - (deq! queue)) + (if priority<? + (cdr (deq! queue)) + (deq! queue))) #f) - (deq! queue)))) + (if priority<? + (cdr (deq! queue)) + (deq! queue))))) (if job-args (begin @@ -499,32 +322,38 @@ upcoming chunk." (define start-new-threads-if-necessary (let ((previous-thread-started-at (make-time time-monotonic 0 0))) (lambda (desired-count) - (let* ((thread-count - (hash-count (const #t) running-job-args)) - (threads-to-start - (- desired-count thread-count))) - (when (> threads-to-start 0) - (for-each - (lambda (thread-index) - (when (eq? (hash-ref running-job-args - thread-index - 'slot-free) - 'slot-free) - (let* ((now (current-time time-monotonic)) - (elapsed (time-difference now - previous-thread-started-at))) - (when (or (eq? #f thread-start-delay) - (time>=? elapsed thread-start-delay)) - (set! previous-thread-started-at now) - (hash-set! running-job-args - thread-index - #f) - (start-thread thread-index))))) - (iota desired-count))))))) + (let* ((thread-count + (hash-count (const #t) running-job-args)) + (threads-to-start + (- desired-count thread-count))) + (when (> threads-to-start 0) + (for-each + (lambda (thread-index) + (when (eq? (hash-ref running-job-args + thread-index + 'slot-free) + 'slot-free) + (let* ((now (current-time time-monotonic)) + (elapsed (time-difference now + previous-thread-started-at))) + (when (or (eq? #f thread-start-delay) + (time>=? elapsed thread-start-delay)) + (set! previous-thread-started-at now) + (hash-set! running-job-args + thread-index + #f) + (start-thread thread-index))))) + (iota desired-count))))))) (if (procedure? thread-count-parameter) (call-with-new-thread (lambda () + (catch 'system-error + (lambda () + (set-thread-name + (string-append name " q t"))) + (const #t)) + (while #t (sleep 15) (with-mutex queue-mutex @@ -565,83 +394,171 @@ falling back to en_US.utf8\n" (setlocale LC_ALL "")) #:unwind? #t)) -(define %worker-thread-args - (make-parameter #f)) - -(define* (make-worker-thread-channel initializer - #:key (parallelism 1) - (delay-logger (lambda _ #f)) - destructor - lifetime - (log-exception? (const #t))) - "Return a channel used to offload work to a dedicated thread. ARGS are the -arguments of the worker thread procedure." +(define-record-type <worker-thread-set> + (worker-thread-set channel arguments-parameter) + worker-thread-set? + (channel worker-thread-set-channel) + (arguments-parameter worker-thread-set-arguments-parameter)) + +(define* (make-worker-thread-set initializer + #:key (parallelism 1) + (delay-logger (lambda _ #f)) + (duration-logger (const #f)) + destructor + lifetime + (log-exception? (const #t)) + (expire-on-exception? #f) + (name "unnamed")) + (define param + (make-parameter #f)) + + (define (initializer/safe) + (let ((args + (with-exception-handler + (lambda (exn) + (simple-format + (current-error-port) + "exception running initializer in worker thread (~A): ~A:\n ~A\n" + name + initializer + exn) + #f) + (lambda () + (with-throw-handler #t + initializer + (lambda args + (backtrace)))) + #:unwind? #t))) + + (if args + args + ;; never give up, just keep retrying + (begin + (sleep 5) + (initializer/safe))))) + + (define (destructor/safe args) + (let ((success? + (with-exception-handler + (lambda (exn) + (simple-format + (current-error-port) + "exception running destructor in worker thread (~A): ~A:\n ~A\n" + name + initializer + exn) + #f) + (lambda () + (with-throw-handler #t + (lambda () + (apply destructor args) + #t) + (lambda _ + (backtrace)))) + #:unwind? #t))) + + (or success? + #t + (begin + (sleep 5) + (destructor/safe args))))) + (let ((channel (make-channel))) (for-each - (lambda _ + (lambda (thread-index) (call-with-new-thread (lambda () - (let init ((args (initializer))) - (parameterize ((%worker-thread-args args)) + (catch 'system-error + (lambda () + (set-thread-name + (string-append + name " w t " + (number->string thread-index)))) + (const #t)) + + (let init ((args (initializer/safe))) + (parameterize ((param args)) (let loop ((current-lifetime lifetime)) - (match (get-message channel) - (((? channel? reply) sent-time (? procedure? proc)) - (let ((time-delay - (- (get-internal-real-time) - sent-time))) - (delay-logger (/ time-delay - internal-time-units-per-second)) - (put-message - reply - (let ((start-time (get-internal-real-time))) - (with-exception-handler - (lambda (exn) - (list 'worker-thread-error - (/ (- (get-internal-real-time) - start-time) - internal-time-units-per-second) - exn)) - (lambda () - (with-throw-handler #t - (lambda () - (call-with-values - (lambda () - (apply proc args)) - (lambda vals - (cons (/ (- (get-internal-real-time) - start-time) + (let ((exception? + (match (get-message channel) + (((? channel? reply) sent-time (? procedure? proc)) + (let ((time-delay + (- (get-internal-real-time) + sent-time))) + (delay-logger (/ time-delay internal-time-units-per-second) - vals)))) - (lambda args - (when (match args - (('%exception exn) - (log-exception? exn)) - (_ #t)) - (simple-format - (current-error-port) - "worker-thread: exception: ~A\n" args) - (backtrace))))) - #:unwind? #t)))))) - (if (number? current-lifetime) - (unless (< current-lifetime 0) - (loop (if current-lifetime - (- current-lifetime 1) - #f))) - (loop #f)))) + proc) + + (let* ((start-time (get-internal-real-time)) + (response + (with-exception-handler + (lambda (exn) + (list 'worker-thread-error + (/ (- (get-internal-real-time) + start-time) + internal-time-units-per-second) + exn)) + (lambda () + (with-throw-handler #t + (lambda () + (call-with-values + (lambda () + (apply proc args)) + (lambda vals + (cons (/ (- (get-internal-real-time) + start-time) + internal-time-units-per-second) + vals)))) + (lambda args + (when (match args + (('%exception exn) + (log-exception? exn)) + (_ #t)) + (simple-format + (current-error-port) + "worker-thread: exception: ~A\n" args) + (backtrace))))) + #:unwind? #t))) + (put-message reply + response) + + (match response + (('worker-thread-error duration _) + (when duration-logger + (duration-logger duration proc)) + #t) + ((duration . _) + (when duration-logger + (duration-logger duration proc)) + #f)))))))) + + (unless (and expire-on-exception? + exception?) + (if (number? current-lifetime) + (unless (< current-lifetime 0) + (loop (if current-lifetime + (- current-lifetime 1) + #f))) + (loop #f)))))) + (when destructor - (apply destructor args)) - (init (initializer)))))) + (destructor/safe args)) + + (init (initializer/safe)))))) (iota parallelism)) - channel)) -(define* (call-with-worker-thread channel proc #:key duration-logger) + (worker-thread-set channel + param))) + +(define* (call-with-worker-thread record proc #:key duration-logger) "Send PROC to the worker thread through CHANNEL. Return the result of PROC. If already in the worker thread, call PROC immediately." - (let ((args (%worker-thread-args))) + (let ((args ((worker-thread-set-arguments-parameter record)))) (if args (apply proc args) (let ((reply (make-channel))) - (put-message channel (list reply (get-internal-real-time) proc)) + (put-message (worker-thread-set-channel record) + (list reply (get-internal-real-time) proc)) (match (get-message reply) (('worker-thread-error duration exn) (when duration-logger @@ -651,3 +568,221 @@ If already in the worker thread, call PROC immediately." (when duration-logger (duration-logger duration)) (apply values result))))))) + +(define* (open-socket-for-uri* uri + #:key (verify-certificate? #t)) + (define tls-wrap + (@@ (web client) tls-wrap)) + + (define https? + (eq? 'https (uri-scheme uri))) + + (define plain-uri + (if https? + (build-uri + 'http + #:userinfo (uri-userinfo uri) + #:host (uri-host uri) + #:port (or (uri-port uri) 443) + #:path (uri-path uri) + #:query (uri-query uri) + #:fragment (uri-fragment uri)) + uri)) + + (let ((s (open-socket-for-uri plain-uri))) + (values + (if https? + (let ((port + (tls-wrap s (uri-host uri) + #:verify-certificate? verify-certificate?))) + ;; Guile/guile-gnutls don't handle the handshake happening on a non + ;; blocking socket, so change the behavior here. + (let ((flags (fcntl s F_GETFL))) + (fcntl s F_SETFL (logior O_NONBLOCK flags))) + port) + (let ((flags (fcntl s F_GETFL))) + (fcntl s F_SETFL (logior O_NONBLOCK flags)) + s)) + s))) + +;; Copied from (fibers web server) +(define (call-with-sigint thunk cvar) + (let ((handler #f)) + (dynamic-wind + (lambda () + (set! handler + (sigaction SIGINT (lambda (sig) (signal-condition! cvar))))) + thunk + (lambda () + (if handler + ;; restore Scheme handler, SIG_IGN or SIG_DFL. + (sigaction SIGINT (car handler) (cdr handler)) + ;; restore original C handler. + (sigaction SIGINT #f)))))) + +;; This variant of run-server from the fibers library supports running +;; multiple servers within one process. +(define run-server/patched + (let ((fibers-web-server-module + (resolve-module '(fibers web server)))) + + (define set-nonblocking! + (module-ref fibers-web-server-module 'set-nonblocking!)) + + (define make-default-socket + (module-ref fibers-web-server-module 'make-default-socket)) + + (define socket-loop + (module-ref fibers-web-server-module 'socket-loop)) + + (lambda* (handler + #:key + (host #f) + (family AF_INET) + (addr (if host + (inet-pton family host) + INADDR_LOOPBACK)) + (port 8080) + (socket (make-default-socket family addr port))) + ;; We use a large backlog by default. If the server is suddenly hit + ;; with a number of connections on a small backlog, clients won't + ;; receive confirmation for their SYN, leading them to retry -- + ;; probably successfully, but with a large latency. + (listen socket 1024) + (set-nonblocking! socket) + (sigaction SIGPIPE SIG_IGN) + (spawn-fiber (lambda () (socket-loop socket handler)))))) + +;; These procedure are subject to spurious wakeups. + +(define (readable? port) + "Test if PORT is writable." + (match (select (vector port) #() #() 0) + ((#() #() #()) #f) + ((#(_) #() #()) #t))) + +(define (writable? port) + "Test if PORT is writable." + (match (select #() (vector port) #() 0) + ((#() #() #()) #f) + ((#() #(_) #()) #t))) + +(define (make-wait-operation ready? schedule-when-ready port port-ready-fd this-procedure) + (make-base-operation #f + (lambda _ + (and (ready? (port-ready-fd port)) values)) + (lambda (flag sched resume) + (define (commit) + (match (atomic-box-compare-and-swap! flag 'W 'S) + ('W (resume values)) + ('C (commit)) + ('S #f))) + (schedule-when-ready + sched (port-ready-fd port) commit)))) + +(define (wait-until-port-readable-operation port) + "Make an operation that will succeed when PORT is readable." + (unless (input-port? port) + (error "refusing to wait forever for input on non-input port")) + (make-wait-operation readable? schedule-task-when-fd-readable port + port-read-wait-fd + wait-until-port-readable-operation)) + +(define (wait-until-port-writable-operation port) + "Make an operation that will succeed when PORT is writable." + (unless (output-port? port) + (error "refusing to wait forever for output on non-output port")) + (make-wait-operation writable? schedule-task-when-fd-writable port + port-write-wait-fd + wait-until-port-writable-operation)) + + + +(define &port-timeout + (make-exception-type '&port-timeout + &external-error + '(port))) + +(define make-port-timeout-error + (record-constructor &port-timeout)) + +(define port-timeout-error? + (record-predicate &port-timeout)) + +(define &port-read-timeout + (make-exception-type '&port-read-timeout + &port-timeout + '())) + +(define make-port-read-timeout-error + (record-constructor &port-read-timeout)) + +(define port-read-timeout-error? + (record-predicate &port-read-timeout)) + +(define &port-write-timeout + (make-exception-type '&port-write-timeout + &port-timeout + '())) + +(define make-port-write-timeout-error + (record-constructor &port-write-timeout)) + +(define port-write-timeout-error? + (record-predicate &port-write-timeout)) + +(define* (with-port-timeouts thunk + #:key timeout + (read-timeout timeout) + (write-timeout timeout)) + (define (no-fibers-wait port mode timeout) + (define poll-timeout-ms 200) + + ;; When the GC runs, it restarts the poll syscall, but the timeout + ;; remains unchanged! When the timeout is longer than the time + ;; between the syscall restarting, I think this renders the + ;; timeout useless. Therefore, this code uses a short timeout, and + ;; repeatedly calls poll while watching the clock to see if it has + ;; timed out overall. + (let ((timeout-internal + (+ (get-internal-real-time) + (* internal-time-units-per-second + (/ timeout 1000))))) + (let loop ((poll-value + (port-poll port mode poll-timeout-ms))) + (if (= poll-value 0) + (if (> (get-internal-real-time) + timeout-internal) + (raise-exception + (if (string=? mode "r") + (make-port-read-timeout-error port) + (make-port-write-timeout-error port))) + (loop (port-poll port mode poll-timeout-ms))) + poll-value)))) + + (parameterize + ((current-read-waiter + (lambda (port) + (if (current-scheduler) + (perform-operation + (choice-operation + (wait-until-port-readable-operation port) + (wrap-operation + (sleep-operation read-timeout) + (lambda () + (raise-exception + (make-port-read-timeout-error thunk port)))))) + (no-fibers-wait port "r" read-timeout)))) + (current-write-waiter + (lambda (port) + (if (current-scheduler) + (perform-operation + (choice-operation + (wait-until-port-writable-operation port) + (wrap-operation + (sleep-operation write-timeout) + (lambda () + (raise-exception + (make-port-write-timeout-error thunk port)))))) + (no-fibers-wait port "w" write-timeout))))) + (thunk))) diff --git a/scripts/nar-herder.in b/scripts/nar-herder.in index d1f95d1..fafcf9f 100644 --- a/scripts/nar-herder.in +++ b/scripts/nar-herder.in @@ -22,6 +22,11 @@ ;;; along with the guix-data-service. If not, see ;;; <http://www.gnu.org/licenses/>. +(when (and (string-prefix? "aarch64-" %host-type) + (not (getenv "GUILE_JIT_THRESHOLD"))) + (setenv "GUILE_JIT_THRESHOLD" "-1") + (apply execlp (car (command-line)) (command-line))) + (setvbuf (current-output-port) 'line) (setvbuf (current-error-port) 'line) @@ -36,9 +41,12 @@ (srfi srfi-19) (srfi srfi-37) (srfi srfi-43) + (srfi srfi-71) (ice-9 ftw) (ice-9 match) (ice-9 format) + (ice-9 threads) + (ice-9 suspendable-ports) (web uri) (web client) (web response) @@ -47,21 +55,20 @@ (logging port-log) (prometheus) (fibers) - (fibers conditions) - (fibers web server) ((guix ui) #:select (read/eval string->duration)) - (guix progress) + (guix store) (guix narinfo) + (guix progress) (guix derivations) ((guix store) #:select (store-path-hash-part)) ((guix build utils) #:select (dump-port)) (nar-herder utils) (nar-herder database) - (nar-herder recent-changes) (nar-herder storage) - (nar-herder mirror) (nar-herder server)) +(install-suspendable-ports!) + (define %valid-log-levels '(DEBUG INFO WARN ERROR)) @@ -80,12 +87,26 @@ (lambda (opt name arg result) (alist-cons 'storage arg - (alist-delete 'storage result)))))) + (alist-delete 'storage result)))) + + (option '("log-level") #t #f + (lambda (opt name arg result) + (alist-cons 'log-level + (let ((level (string->symbol (string-upcase arg)))) + (if (member level %valid-log-levels) + level + (error + (simple-format #f "unknown log level ~A\nvalid levels are: ~A\n" + level + %valid-log-levels)))) + (alist-delete 'log-level result)))))) (define %base-option-defaults ;; Alist of default option values `((database . ,(string-append (getcwd) "/nar_herder.db")) - (database-dump . ,(string-append (getcwd) "/nar_herder_dump.db")))) + (database-dump . ,(string-append (getcwd) "/nar_herder_dump.db")) + + (log-level . DEBUG))) (define %import-options (list (option '("tag") #t #f @@ -96,7 +117,10 @@ (cons key value))) (or (assq-ref result 'tags) '())) - (alist-delete 'tags result)))))) + (alist-delete 'tags result)))) + (option '("ensure-references-exist") #f #f + (lambda (opt name _ result) + (alist-cons 'ensure-references-exist #t result))))) (define %import-options-defaults '()) @@ -136,6 +160,111 @@ (call-with-input-string rest read)))) result))) + (option '("enable-cached-compression") #t #f + (lambda (opt name arg result) + (alist-cons 'cached-compression + (match (string-split arg #\:) + ((type) + `((type . ,(string->symbol type)))) + ((type level) + `((type . ,(string->symbol type)) + (level . ,(string->number level))))) + result))) + + (option '("cached-compression-directory") #t #f + (lambda (opt name arg result) + (alist-cons 'cached-compression-directory + (match (string-split arg #\=) + ((type directory) + (cons (string->symbol type) + (canonicalize-path directory)))) + result))) + + (option '("cached-compression-directory-max-size") #t #f + (lambda (opt name arg result) + (alist-cons 'cached-compression-directory-max-size + (match (string-split arg #\=) + ((type size) + (cons (string->symbol type) + (string->number size)))) + result))) + + (option '("cached-compression-min-uses") #t #f + (lambda (opt name arg result) + (alist-cons 'cached-compression-min-uses + (string->number arg) + (alist-delete 'cached-compression-min-uses + result)))) + + (option '("cached-compression-workers") #t #f + (lambda (opt name arg result) + (alist-cons 'cached-compression-workers + (string->number arg) + result))) + + (option '("cached-compression-nar-source") #t #f + (lambda (opt name arg result) + (alist-cons 'cached-compression-nar-source + arg + (alist-delete 'cached-compression-nar-source + result)))) + (option '("cached-compression-unused-removal-duration") #t #f + (lambda (opt name arg result) + (alist-cons + 'cached-compression-unused-removal-duration + (match (string-split arg #\=) + ((_) + (simple-format + (current-error-port) + "cached-compressions-unused-removal-duration: you must specify compression and value\n") + (exit 1)) + ((type duration-string) + (cons (string->symbol type) + (let ((duration (string->duration duration-string))) + (unless duration + (simple-format + (current-error-port) + "~A: cached-compressions-unused-removal-duration: invalid duration\n" + arg) + (exit 1)) + + duration)))) + result))) + (option '("cached-compressions-ttl") #t #f + (lambda (opt name arg result) + (alist-cons 'cached-compression-ttl + (match (string-split arg #\=) + ((_) + (simple-format + (current-error-port) + "cached-compressions-ttl: you must specify compression and value\n") + (exit 1)) + ((type ttl-string) + (let ((duration (string->duration ttl-string))) + (unless duration + (simple-format (current-error-port) + "~A: invalid duration\n" + arg) + (exit 1)) + + (cons (string->symbol type) + (time-second duration))))) + result))) + (option '("cached-compressions-new-ttl") #t #f + (lambda (opt name arg result) + (let ((duration (string->duration arg))) + (unless duration + (simple-format (current-error-port) + "~A: invalid duration\n" + arg) + (exit 1)) + (alist-cons 'cached-compression-new-ttl + (match (string-split arg #\=) + ((type size) + (cons (string->symbol type) + (time-second duration)))) + result)))) + (option '("ttl") #t #f (lambda (opt name arg result) (let ((duration (string->duration arg))) @@ -146,6 +275,16 @@ (exit 1)) (alist-cons 'narinfo-ttl (time-second duration) result)))) + (option '("new-ttl") #t #f + (lambda (opt name arg result) + (let ((duration (string->duration arg))) + (unless duration + (simple-format (current-error-port) + "~A: invalid duration\n" + arg) + (exit 1)) + (alist-cons 'new-narinfo-ttl (time-second duration) + result)))) (option '("negative-ttl") #t #f (lambda (opt name arg result) (let ((duration (string->duration arg))) @@ -169,26 +308,40 @@ arg (alist-delete 'mirror result)))) - (option '("log-level") #t #f + (option '("parallelism") #t #f (lambda (opt name arg result) - (alist-cons 'log-level - (let ((level (string->symbol (string-upcase arg)))) - (if (member level %valid-log-levels) - level - (error - (simple-format #f "unknown log level ~A\nvalid levels are: ~A\n" - level - %valid-log-levels)))) - (alist-delete 'log-level result)))))) + (alist-cons 'parallelism + (string->number arg) + (alist-delete 'parallelism result)))) + + (option '("database-reader-threads") #t #f + (lambda (opt name arg result) + (alist-cons 'database-reader-threads + (string->number arg) + (alist-delete 'database-reader-threads result)))))) (define %server-option-defaults - '((port . 8080) + `((port . 8080) (host . "0.0.0.0") - (log-level . DEBUG) - (storage-limit . "none") - (recent-changes-limit . 32768))) + + (cached-compression-workers . 2) + (cached-compression-min-uses . 3) + + (recent-changes-limit . 32768) + + (database-reader-threads . ,(min (max (current-processor-count) + 2) + 64)) + + (parallelism . 1))) + +(define %check-options + (list)) + +(define %check-option-defaults + '()) (define (parse-options options defaults args) (args-fold @@ -245,19 +398,52 @@ (call-with-progress-reporter progress (lambda (report) - (for-each (lambda (narinfo) - (database-insert-narinfo - database - (call-with-input-file narinfo - (lambda (port) - ;; Set url to a dummy value as this doesn't - ;; matter - (read-narinfo port "https://narherderdummyvalue"))) - #:tags (or (assq-ref opts 'tags) - '())) - - (report)) - narinfos)))))) + (database-call-with-transaction + database + (lambda (db) + (let ((read-narinfos + (map + (lambda (narinfo-file) + (let ((narinfo + (call-with-input-file narinfo-file + (lambda (port) + ;; Set url to a dummy value as this doesn't + ;; matter + (read-narinfo port + "https://narherderdummyvalue"))))) + + (database-insert-narinfo + database + narinfo + #:tags (or (assq-ref opts 'tags) + '())) + + (report) + + narinfo)) + narinfos))) + + (when (assq-ref opts 'ensure-references-exist) + (for-each + (lambda (narinfo) + (let ((self-reference + (store-path-base + (narinfo-path narinfo)))) + (for-each + (lambda (reference) + (when (and + (not + (string=? reference self-reference)) + (not + (database-select-narinfo-by-hash + database + (string-take reference 32)))) + (error + (simple-format (current-error-port) + "missing reference to ~A\n" + reference)))) + (narinfo-references narinfo)))) + read-narinfos)))))))))) (("remove" rest ...) (let* ((opts (parse-options %base-options %base-option-defaults @@ -272,11 +458,14 @@ (port-log (make <port-log> #:port (current-output-port) #:formatter - (lambda (lvl time str) + ;; In guile-lib v0.2.8 onwards, the formatter is + ;; called with more arguments + (lambda args ; lvl, time, str (format #f "~a (~5a): ~a~%" - (strftime "%F %H:%M:%S" (localtime time)) - lvl - str))))) + (strftime "%F %H:%M:%S" (localtime + (second args))) + (first args) + (third args)))))) (add-handler! lgr port-log) (open-log! lgr) @@ -284,24 +473,91 @@ (for-each (lambda (store-path) - (log-msg 'INFO "removing " store-path) - - (if (assq-ref opts 'storage) - (begin - ;; Removing the files here isn't ideal, since the servers - ;; metrics won't be updated until the next get-nar-files call, - ;; but it avoids extra complexity in trying to have the server - ;; delete the files. - (remove-nar-files-by-hash - database - (assq-ref opts 'storage) - metrics-registry - (store-path-hash-part store-path))) - (log-msg - 'WARN "no --storage set, so just removing from the database")) - - (database-remove-narinfo database store-path)) + (let ((narinfo-details + (database-select-narinfo-by-hash + database + (store-path-hash-part store-path)))) + + (if narinfo-details + (let ((cached-narinfo-files + (database-select-cached-narinfo-files-by-narinfo-id + database + (assq-ref narinfo-details 'id)))) + + (log-msg 'INFO "removing " store-path) + + (if (assq-ref opts 'storage) + (begin + (remove-nar-files-by-hash + database + (assq-ref opts 'storage) + metrics-registry + (store-path-hash-part store-path) + #:error-unless-files-to-remove? #f)) + (log-msg + 'WARN "no --storage set, so just removing from the database")) + + (for-each + (lambda (cached-narinfo-details) + ;; It might not have been scheduled for + ;; removal, but remove any schedule that + ;; exists + (database-delete-scheduled-cached-narinfo-removal + database + (assq-ref cached-narinfo-details 'id)) + + ;; Remove all the database entries first, as + ;; that'll stop these files appearing in narinfos + (database-remove-cached-narinfo-file + database + narinfo-id + (symbol->string compression))) + cached-narinfo-files) + + (database-remove-narinfo database store-path)) + (log-msg 'WARN store-path " not found to remove")))) (assq-ref opts 'arguments)))) + (("check" rest ...) + (let* ((opts (parse-options (append %base-options + %check-options) + (append %base-option-defaults + %check-option-defaults) + rest)) + (lgr (make <logger>)) + (port-log (make <port-log> + #:port (current-output-port) + #:formatter + ;; In guile-lib v0.2.8 onwards, the formatter is + ;; called with more arguments + (lambda args ; lvl, time, str + (format #f "~a (~5a): ~a~%" + (strftime "%F %H:%M:%S" (localtime + (second args))) + (first args) + (third args))))) + (metrics-registry (make-metrics-registry + #:namespace + "narherder"))) + + (add-handler! lgr port-log) + (open-log! lgr) + (set-default-logger! lgr) + + (let ((log-level (assq-ref opts 'log-level))) + (let loop ((levels %valid-log-levels)) + (when (and (not (null? levels)) + (not (eq? (car levels) log-level))) + (disable-log-level! lgr (car levels)) + (loop (cdr levels))))) + + (let* ((database (setup-database (assq-ref opts 'database) + metrics-registry)) + (canonical-storage (and=> (assq-ref opts 'storage) + canonicalize-path))) + + (check-storage database + canonical-storage + metrics-registry)))) (("run-server" rest ...) (simple-format (current-error-port) "locale is ~A\n" (check-locale!)) @@ -317,37 +573,14 @@ (port-log (make <port-log> #:port (current-output-port) #:formatter - (lambda (lvl time str) + ;; In guile-lib v0.2.8 onwards, the formatter is + ;; called with more arguments + (lambda args ; lvl, time, str (format #f "~a (~5a): ~a~%" - (strftime "%F %H:%M:%S" (localtime time)) - lvl - str)))) - (metrics-registry (make-metrics-registry - #:namespace - "narherder"))) - - (define (download-database) - (let ((database-uri - (string->uri - (string-append (assq-ref opts 'mirror) - "/latest-database-dump")))) - (call-with-values - (lambda () - (simple-format (current-error-port) - "starting downloading the database\n") - (http-get database-uri - #:decode-body? #f - #:streaming? #t)) - (lambda (response body) - (when (not (= (response-code response) 200)) - (error "unable to fetch database from mirror")) - - (call-with-output-file (assq-ref opts 'database) - (lambda (output-port) - (dump-port body output-port))) - - (simple-format (current-error-port) - "finished downloading the database\n"))))) + (strftime "%F %H:%M:%S" (localtime + (second args))) + (first args) + (third args)))))) (add-handler! lgr port-log) (open-log! lgr) @@ -378,73 +611,4 @@ (lambda (port) (simple-format port "~A\n" (getpid)))))) - (and=> - (assq-ref opts 'mirror) - (lambda (mirror) - (let ((database-file (assq-ref opts 'database))) - (if (file-exists? database-file) - (begin - ;; TODO Open the database, and check if the - ;; latest changes in the database are visible on - ;; the source to mirror. If they're not, then - ;; delete the database and download it to get - ;; back in sync - - #f) - (download-database))))) - - (let ((database (setup-database (assq-ref opts 'database) - metrics-registry)) - (canonical-storage (and=> (assq-ref opts 'storage) - canonicalize-path))) - - (when (not (file-exists? (assq-ref opts 'database-dump))) - (log-msg 'INFO "dumping database...") - (dump-database database (assq-ref opts 'database-dump))) - - (start-recent-change-removal-and-database-dump-thread - database - (assq-ref opts 'database-dump) - (* 24 3600) ; 24 hours - (assq-ref opts 'recent-changes-limit)) - - (and=> (assq-ref opts 'mirror) - (lambda (mirror) - (start-fetch-changes-thread database - canonical-storage - mirror - metrics-registry) - - (when (assq-ref opts 'storage) - (start-mirroring-thread database - mirror - (assq-ref opts 'storage-limit) - canonical-storage - metrics-registry)))) - - - (when (and (assq-ref opts 'storage) - (number? (assq-ref opts 'storage-limit))) - (start-nar-removal-thread database - canonical-storage - (assq-ref opts 'storage-limit) - metrics-registry - (filter-map - (match-lambda - ((key . val) - (if (eq? key 'storage-nar-removal-criteria) - val - #f))) - opts))) - - (log-msg 'INFO "starting server, listening on " - (assq-ref opts 'host) ":" (assq-ref opts 'port)) - (run-server - (make-request-handler database - canonical-storage - #:ttl (assq-ref opts 'narinfo-ttl) - #:negative-ttl (assq-ref opts 'narinfo-negative-ttl) - #:logger lgr - #:metrics-registry metrics-registry) - #:host (assq-ref opts 'host) - #:port (assq-ref opts 'port)))))) + (run-nar-herder-service opts lgr)))) |